mirror of
https://github.com/csd4ni3l/frigate-mqtt-notifier.git
synced 2025-11-05 05:58:05 +01:00
207 lines
6.5 KiB
Python
207 lines
6.5 KiB
Python
import os, time, json, logging, sys
|
|
from copy import deepcopy
|
|
|
|
from paho.mqtt import client as mqtt_client
|
|
from ntfpy import NTFYServer, NTFYClient, NTFYPushMessage, NTFYUrlAttachment, NTFYUser
|
|
from paho.mqtt.enums import CallbackAPIVersion
|
|
|
|
MQTT_BROKER_IP = os.getenv("MQTT_BROKER_IP", "eclipse-mosquitto")
|
|
MQTT_BROKER_PORT = int(os.getenv("MQTT_BROKER_PORT", 1883))
|
|
MQTT_CLIENT_ID = os.getenv("MQTT_CLIENT_ID", "frigate-mqtt-notifier")
|
|
MQTT_BROKER_USERNAME = os.getenv("MQTT_BROKER_USERNAME")
|
|
MQTT_BROKER_PASSWORD = os.getenv("MQTT_BROKER_PASSWORD")
|
|
|
|
NTFY_SERVER_URL = os.getenv("NTFY_SERVER_URL", "https://ntfy.sh")
|
|
NTFY_TOPIC = os.getenv("NTFY_TOPIC", "frigate-events")
|
|
NTFY_USERNAME = os.getenv("NTFY_USERNAME")
|
|
NTFY_PASSWORD = os.getenv("NTFY_PASSWORD")
|
|
|
|
MESSAGE_TIMEOUT = float(os.getenv("MESSAGE_TIMEOUT", 1.0))
|
|
FRIGATE_BASE_URL = os.getenv("FRIGATE_BASE_URL", "http://frigate:5000")
|
|
|
|
_last_msg_time = 0.0
|
|
_seen_new = {}
|
|
_entered_zones = {}
|
|
|
|
logging_levels = {
|
|
"DEBUG": logging.DEBUG,
|
|
"INFO": logging.INFO,
|
|
"ERROR": logging.ERROR,
|
|
"WARNING": logging.WARNING,
|
|
}
|
|
|
|
logging.basicConfig(level=logging_levels.get(os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO), format='%(asctime)s %(name)s %(levelname)s: %(message)s')
|
|
|
|
logging.info("Starting Frigate MQTT Notifier...")
|
|
|
|
logging.info("")
|
|
logging.info("FRIGATE MQTT NOTIFIER CONFIG:")
|
|
logging.info("")
|
|
logging.info(f"MQTT_BROKER_IP={MQTT_BROKER_IP}")
|
|
logging.info(f"MQTT_BROKER_PORT={MQTT_BROKER_PORT}")
|
|
logging.info(f"MQTT_CLIENT_ID={MQTT_CLIENT_ID}")
|
|
logging.info(f"MQTT_BROKER_USERNAME={MQTT_BROKER_USERNAME}")
|
|
logging.info(f"MQTT_BROKER_PASSWORD={MQTT_BROKER_PASSWORD}")
|
|
logging.info(f"MESSAGE_TIMEOUT={MESSAGE_TIMEOUT}")
|
|
logging.info(f"FRIGATE_BASE_URL={FRIGATE_BASE_URL}")
|
|
logging.info(f"NTFY_SERVER_URL={NTFY_SERVER_URL}")
|
|
logging.info(f"NTFY_TOPIC={NTFY_TOPIC}")
|
|
logging.info(f"NTFY_USERNAME={NTFY_USERNAME}")
|
|
logging.info(f"NTFY_PASSWORD={NTFY_PASSWORD}")
|
|
logging.info("")
|
|
|
|
logging.debug("Connecting to Ntfy...")
|
|
|
|
ntfy_server = NTFYServer(NTFY_SERVER_URL)
|
|
ntfy_user = NTFYUser(NTFY_USERNAME, NTFY_PASSWORD) if NTFY_USERNAME and NTFY_PASSWORD else None
|
|
ntfy_client = NTFYClient(ntfy_server, NTFY_TOPIC, ntfy_user)
|
|
|
|
def on_connect(client, userdata, flags, rc):
|
|
if rc == 0:
|
|
logging.info("Connected to MQTT Broker!")
|
|
|
|
msg = NTFYPushMessage(message="Connected to MQTT Broker!",title="Frigate MQTT Notifier")
|
|
ntfy_client.send_message(msg)
|
|
else:
|
|
logging.error(f"Connection to MQTT Broker failed (rc={rc})")
|
|
|
|
msg = NTFYPushMessage(message=f"Connection to MQTT Broker failed (rc={rc})",title="Frigate MQTT Notifier")
|
|
ntfy_client.send_message(msg)
|
|
|
|
def get_zone_changes(event_id, current):
|
|
prev = _entered_zones.setdefault(event_id, [])
|
|
|
|
logging.debug(f"Event {event_id} previous zones: {', '.join(prev)}")
|
|
logging.debug(f"Event {event_id} current zones: {', '.join(current)}")
|
|
|
|
entered = [z for z in current if z not in prev]
|
|
|
|
_entered_zones[event_id] = current.copy()
|
|
|
|
logging.debug(f"Event {event_id} entered zones: {', '.join(entered)}")
|
|
|
|
return entered
|
|
|
|
def on_message(client, userdata, mqtt_msg):
|
|
global _last_msg_time
|
|
now = time.time()
|
|
|
|
if now - _last_msg_time < MESSAGE_TIMEOUT: # Throttle messages
|
|
logging.debug("Event message throttled")
|
|
return
|
|
|
|
_last_msg_time = now
|
|
|
|
try:
|
|
payload = json.loads(mqtt_msg.payload)
|
|
except json.JSONDecodeError:
|
|
logging.warning("Invalid JSON payload got from MQTT broker")
|
|
return
|
|
|
|
event_type = payload.get("type")
|
|
data = payload.get("after", {})
|
|
eid = str(data.get("id", ""))
|
|
|
|
if data.get("stationary"):
|
|
logging.debug("Stationary event ignored")
|
|
return
|
|
|
|
label = data.get("label", "Object").capitalize()
|
|
score = data.get("top_score", 0) * 100
|
|
zones = data.get("current_zones", [])
|
|
snap = data.get("has_snapshot")
|
|
clip = data.get("has_clip")
|
|
|
|
snap_url = f"{FRIGATE_BASE_URL}/api/events/{eid}/snapshot.jpg" if snap else None
|
|
clip_url = f"{FRIGATE_BASE_URL}/api/events/{eid}/clip.mp4" if clip else None
|
|
|
|
if event_type == "new":
|
|
if not _seen_new.get(eid):
|
|
_seen_new[eid] = True
|
|
_entered_zones[eid] = deepcopy(zones)
|
|
|
|
logging.debug(f"New {label} event detected in {', '.join(zones)}")
|
|
|
|
body = f"New {label} Detected with {score:.1f}% certainty in {', '.join(zones)}"
|
|
|
|
msg = NTFYPushMessage(body, title=f"{label} Detected")
|
|
|
|
if clip_url:
|
|
msg.attachment = NTFYUrlAttachment(clip_url)
|
|
|
|
logging.debug(f"Found Event clip URL: {clip_url}")
|
|
|
|
elif snap_url:
|
|
msg.attachment = NTFYUrlAttachment(snap_url)
|
|
|
|
logging.debug(f"Found Event snapshot URL: {snap_url}")
|
|
|
|
ntfy_client.send_message(msg)
|
|
|
|
else:
|
|
logging.debug("Ignoring already seen event")
|
|
|
|
elif event_type == "update":
|
|
new_z = get_zone_changes(eid, zones)
|
|
|
|
logging.debug(f"{label} event updated in {', '.join(zones)}")
|
|
|
|
if not new_z:
|
|
logging.debug(f"No new zones detected for event {eid}")
|
|
return
|
|
|
|
body = f"{label} entered zones: {', '.join(new_z)}"
|
|
msg = NTFYPushMessage(body, title=f"{label} Zone Entry")
|
|
|
|
if snap_url:
|
|
msg.attachment = NTFYUrlAttachment(snap_url)
|
|
|
|
logging.debug(f"Found Event snapshot URL: {snap_url}")
|
|
|
|
ntfy_client.send_message(msg)
|
|
|
|
elif event_type == "end":
|
|
body = f"{label} left view"
|
|
|
|
logging.debug(f"{label} event ended")
|
|
|
|
msg = NTFYPushMessage(body, title=f"{label} Left View")
|
|
|
|
if clip_url:
|
|
msg.attachment = NTFYUrlAttachment(clip_url)
|
|
|
|
logging.debug(f"Found Event clip URL: {clip_url}")
|
|
|
|
elif snap_url:
|
|
msg.attachment = NTFYUrlAttachment(snap_url)
|
|
|
|
logging.debug(f"Found Event snapshot URL: {snap_url}")
|
|
|
|
ntfy_client.send_message(msg)
|
|
|
|
client = mqtt_client.Client(CallbackAPIVersion.VERSION2, MQTT_CLIENT_ID)
|
|
|
|
if MQTT_BROKER_USERNAME:
|
|
client.username_pw_set(MQTT_BROKER_USERNAME, MQTT_BROKER_PASSWORD)
|
|
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
|
|
try:
|
|
client.connect(MQTT_BROKER_IP, MQTT_BROKER_PORT)
|
|
except Exception as e:
|
|
logging.error(f"Failed to connect to MQTT broker: {e}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
client.subscribe("frigate/events")
|
|
except Exception as e:
|
|
logging.error(f"Failed to subscribe to MQTT topic: {e}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
client.loop_forever()
|
|
except Exception as e:
|
|
logging.error(f"Failed to start MQTT client: {e}")
|
|
sys.exit(1)
|