Created
October 26, 2024 18:30
-
-
Save wiking-at/fc30228cee8bff3cefca975b48119b28 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import time | |
from datetime import datetime | |
import paho.mqtt.client as mqtt_client | |
# Door station configurations | |
DOOR_STATIONS = { | |
"10.10.2.28": { | |
"name": "Haupteingang", | |
"topic": "frigate/Haupteingang/klingeln/state" | |
}, | |
"10.10.2.27": { | |
"name": "Nebeneingang", | |
"topic": "frigate/Nebeneingang/klingeln/state" | |
} | |
} | |
# MQTT settings | |
broker = 'mqtt' | |
port = 1883 | |
client_id = f'hikvision_sipserver' | |
mqtt_username = 'hik-sip' | |
mqtt_password = 'redacted' | |
# UDP listener settings | |
localIP = "0.0.0.0" # 0.0.0.0 = all interfaces | |
localPort = 5060 | |
bufferSize = 1024 | |
def connect_mqtt(): | |
def on_connect(client, userdata, flags, rc): | |
if rc == 0: | |
print("Connected to MQTT Broker!") | |
else: | |
print("Failed to connect, return code %d\n", rc) | |
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id) | |
client.username_pw_set(mqtt_username, mqtt_password) | |
client.on_connect = on_connect | |
client.connect(broker, port) | |
return client | |
def publish(client, topic, msg): | |
result = client.publish(topic, msg) | |
status = result[0] | |
if status == 0: | |
print(f"Send `{msg}` to topic `{topic}`") | |
else: | |
print(f"Failed to send message to topic {topic}") | |
def handle_doorbell(door_ip): | |
if door_ip not in DOOR_STATIONS: | |
print(f"Unknown door station IP: {door_ip}") | |
return | |
door_config = DOOR_STATIONS[door_ip] | |
door_name = door_config["name"] | |
topic = door_config["topic"] | |
client = connect_mqtt() | |
client.loop_start() | |
t = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") | |
print(f"{t}: Doorbell ring detected at {door_name}") | |
publish(client, topic, "ON") | |
# ignore other udp sip frames for 5 seconds and change mqtt state back after this timeout | |
time.sleep(5) | |
publish(client, topic, "OFF") | |
client.loop_stop() | |
def main(): | |
# Create a datagram socket | |
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) | |
# Bind to address and ip | |
UDPServerSocket.bind((localIP, localPort)) | |
t = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") | |
print(f"{t}: UDP server up and listening on port {localPort}") | |
try: | |
while True: | |
bytesAddressPair = UDPServerSocket.recvfrom(bufferSize) | |
message = bytesAddressPair[0] | |
address = bytesAddressPair[1] | |
client_ip = address[0] | |
decoded_frame = message.decode("utf-8") | |
sipinvite = decoded_frame.startswith('INVITE sip:ManageCenter@') | |
if sipinvite: | |
UDPServerSocket.close() | |
handle_doorbell(client_ip) | |
# re-open udp socket | |
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) | |
UDPServerSocket.bind((localIP, localPort)) | |
except KeyboardInterrupt: | |
print("\nShutting down server...") | |
finally: | |
UDPServerSocket.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Listen for SIP invite frames from hikvision doorstations. in my case a DS-K1T342MFWX. This frame is created if you configure this script to run on a listiner which is configured as the "Management Center" and the Intercom action set to "Call Management Center". The listener then trigers MQTT messages.
I run this script in a minimal alpine container.