import os, time, json, logging, sys from copy import deepcopy from paho.mqtt import client as mqtt_client from ntfpy import NTFYServer, NTFYClient, NTFYPushMessage, NTFYUrlAttachment MQTT_BROKER_IP = os.getenv("MQTT_BROKER_IP", "localhost") MQTT_BROKER_PORT = int(os.getenv("MQTT_BROKER_PORT", 1883)) MQTT_CLIENT_ID = os.getenv("MQTT_CLIENT_ID", "frigate-notifier") MQTT_BROKER_USERNAME = os.getenv("MQTT_BROKER_USERNAME") MQTT_BROKER_PASSWORD = os.getenv("MQTT_BROKER_PASSWORD") NTFY_SERVER_URL = os.getenv("NTFY_SERVER_URL", "https://ntfy.sh") NTFY_TOPIC = os.getenv("NTFY_TOPIC", "frigate-events") NTFY_USERNAME = os.getenv("NTFY_USERNAME") NTFY_PASSWORD = os.getenv("NTFY_PASSWORD") MESSAGE_TIMEOUT = float(os.getenv("MESSAGE_TIMEOUT", 1.0)) FRIGATE_BASE_URL = os.getenv("FRIGATE_BASE_URL", "http://localhost:5000") _last_msg_time = 0.0 _seen_new = {} _entered_zones = {} logging_levels = { "DEBUG": logging.DEBUG, "INFO": logging.INFO, "ERROR": logging.ERROR, "WARNING": logging.WARNING, } logging.basicConfig(level=logging_levels.get(os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO), format='%(asctime)s %(name)s %(levelname)s: %(message)s') logging.info("Starting Frigate MQTT Notifier...") logging.info("") logging.info("FRIGATE MQTT NOTIFIER CONFIG:") logging.info("") logging.info(f"MQTT_BROKER_IP={MQTT_BROKER_IP}") logging.info(f"MQTT_BROKER_PORT={MQTT_BROKER_PORT}") logging.info(f"MQTT_CLIENT_ID={MQTT_CLIENT_ID}") logging.info(f"MQTT_BROKER_USERNAME={MQTT_BROKER_USERNAME}") logging.info(f"MQTT_BROKER_PASSWORD={MQTT_BROKER_PASSWORD}") logging.info(f"MESSAGE_TIMEOUT={MESSAGE_TIMEOUT}") logging.info(f"FRIGATE_BASE_URL={FRIGATE_BASE_URL}") logging.info(f"NTFY_SERVER_URL={NTFY_SERVER_URL}") logging.info(f"NTFY_TOPIC={NTFY_TOPIC}") logging.info(f"NTFY_USERNAME={NTFY_USERNAME}") logging.info(f"NTFY_PASSWORD={NTFY_PASSWORD}") logging.info("") logging.debug("Connecting to Ntfy...") ntfy_server = NTFYServer(NTFY_SERVER_URL) ntfy_user = NTFYUser(NTFY_USERNAME, NTFY_PASSWORD) if NTFY_USERNAME and NTFY_PASSWORD else None ntfy_client = NTFYClient(ntfy_server, NTFY_TOPIC, ntfy_user) def on_connect(client, userdata, flags, rc): if rc == 0: logging.info("Connected to MQTT Broker!") msg = NTFYPushMessage(body="Connected to MQTT Broker!",title="Frigate MQTT Notifier") ntfy_client.send_message(msg) else: logging.error(f"Connection to MQTT Broker failed (rc={rc})") msg = NTFYPushMessage(body=f"Connection to MQTT Broker failed (rc={rc})",title="Frigate MQTT Notifier") ntfy_client.send_message(msg) def get_zone_changes(event_id, current): prev = _entered_zones.setdefault(event_id, []) logging.debug(f"Event {event_id} previous zones: {', '.join(prev)}") logging.debug(f"Event {event_id} current zones: {', '.join(current)}") entered = [z for z in current if z not in prev] _entered_zones[event_id] = current.copy() logging.debug(f"Event {event_id} entered zones: {', '.join(entered)}") return entered def on_message(client, userdata, mqtt_msg): global _last_msg_time now = time.time() if now - _last_msg_time < MESSAGE_TIMEOUT: # Throttle messages logging.debug("Event message throttled") return _last_msg_time = now try: payload = json.loads(mqtt_msg.payload) except json.JSONDecodeError: logging.warning("Invalid JSON payload got from MQTT broker") return event_type = payload.get("type") data = payload.get("after", {}) eid = str(data.get("id", "")) if data.get("stationary"): logging.debug("Stationary event ignored") return label = data.get("label", "Object").capitalize() score = data.get("top_score", 0) * 100 zones = data.get("current_zones", []) snap = data.get("has_snapshot") clip = data.get("has_clip") snap_url = f"{FRIGATE_BASE_URL}/api/events/{eid}/snapshot.jpg" if snap else None clip_url = f"{FRIGATE_BASE_URL}/api/events/{eid}/clip.mp4" if clip else None if event_type == "new": if not _seen_new.get(eid): _seen_new[eid] = True _entered_zones[eid] = deepcopy(zones) logging.debug(f"New {label} event detected in {', '.join(zones)}") body = f"New {label} Detected with {score:.1f}% certainty in {', '.join(zones)}" msg = NTFYPushMessage(body, title=f"{label} Detected") if clip_url: msg.attachment = NTFYUrlAttachment(clip_url) logging.debug(f"Found Event clip URL: {clip_url}") elif snap_url: msg.attachment = NTFYUrlAttachment(snap_url) logging.debug(f"Found Event snapshot URL: {snap_url}") ntfy_client.send_message(msg) else: logging.debug("Ignoring already seen event") elif event_type == "update": new_z = get_zone_changes(eid, zones) logging.debug(f"{label} event updated in {', '.join(zones)}") if not new_z: logging.debug(f"No new zones detected for event {eid}") return body = f"{label} entered zones: {', '.join(new_z)}" msg = NTFYPushMessage(body, title=f"{label} Zone Entry") if snap_url: msg.attachment = NTFYUrlAttachment(snap_url) logging.debug(f"Found Event snapshot URL: {snap_url}") ntfy_client.send_message(msg) elif event_type == "end": body = f"{label} left view" logging.debug(f"{label} event ended") msg = NTFYPushMessage(body, title=f"{label} Left View") if clip_url: msg.attachment = NTFYUrlAttachment(clip_url) logging.debug(f"Found Event clip URL: {clip_url}") elif snap_url: msg.attachment = NTFYUrlAttachment(snap_url) logging.debug(f"Found Event snapshot URL: {snap_url}") ntfy_client.send_message(msg) client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, MQTT_CLIENT_ID) if MQTT_BROKER_USERNAME: client.username_pw_set(MQTT_BROKER_USERNAME, MQTT_BROKER_PASSWORD) client.on_connect = on_connect client.on_message = on_message try: client.connect(MQTT_BROKER_IP, MQTT_BROKER_PORT) except Exception as e: logging.error(f"Failed to connect to MQTT broker: {e}") sys.exit(1) try: client.subscribe("frigate/events") except Exception as e: logging.error(f"Failed to subscribe to MQTT topic: {e}") sys.exit(1) try: client.loop_forever() except Exception as e: logging.error(f"Failed to start MQTT client: {e}") sys.exit(1)