Skip to content

Instantly share code, notes, and snippets.

@wiking-at
Created October 26, 2024 18:30
Show Gist options
  • Save wiking-at/fc30228cee8bff3cefca975b48119b28 to your computer and use it in GitHub Desktop.
Save wiking-at/fc30228cee8bff3cefca975b48119b28 to your computer and use it in GitHub Desktop.
import socket
import time
from datetime import datetime
import paho.mqtt.client as mqtt_client
# Door station configurations
DOOR_STATIONS = {
"10.10.2.28": {
"name": "Haupteingang",
"topic": "frigate/Haupteingang/klingeln/state"
},
"10.10.2.27": {
"name": "Nebeneingang",
"topic": "frigate/Nebeneingang/klingeln/state"
}
}
# MQTT settings
broker = 'mqtt'
port = 1883
client_id = f'hikvision_sipserver'
mqtt_username = 'hik-sip'
mqtt_password = 'redacted'
# UDP listener settings
localIP = "0.0.0.0" # 0.0.0.0 = all interfaces
localPort = 5060
bufferSize = 1024
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id)
client.username_pw_set(mqtt_username, mqtt_password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client, topic, msg):
result = client.publish(topic, msg)
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
def handle_doorbell(door_ip):
if door_ip not in DOOR_STATIONS:
print(f"Unknown door station IP: {door_ip}")
return
door_config = DOOR_STATIONS[door_ip]
door_name = door_config["name"]
topic = door_config["topic"]
client = connect_mqtt()
client.loop_start()
t = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
print(f"{t}: Doorbell ring detected at {door_name}")
publish(client, topic, "ON")
# ignore other udp sip frames for 5 seconds and change mqtt state back after this timeout
time.sleep(5)
publish(client, topic, "OFF")
client.loop_stop()
def main():
# Create a datagram socket
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
# Bind to address and ip
UDPServerSocket.bind((localIP, localPort))
t = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
print(f"{t}: UDP server up and listening on port {localPort}")
try:
while True:
bytesAddressPair = UDPServerSocket.recvfrom(bufferSize)
message = bytesAddressPair[0]
address = bytesAddressPair[1]
client_ip = address[0]
decoded_frame = message.decode("utf-8")
sipinvite = decoded_frame.startswith('INVITE sip:ManageCenter@')
if sipinvite:
UDPServerSocket.close()
handle_doorbell(client_ip)
# re-open udp socket
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
UDPServerSocket.bind((localIP, localPort))
except KeyboardInterrupt:
print("\nShutting down server...")
finally:
UDPServerSocket.close()
if __name__ == "__main__":
main()
@wiking-at
Copy link
Author

Listen for SIP invite frames from hikvision doorstations. in my case a DS-K1T342MFWX. This frame is created if you configure this script to run on a listiner which is configured as the "Management Center" and the Intercom action set to "Call Management Center". The listener then trigers MQTT messages.

I run this script in a minimal alpine container.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment