-
-
Save pergolafabio/1349c6a05af52b70a558995bd3c29fff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import time | |
from datetime import datetime | |
import paho.mqtt.client as mqtt_client | |
# Door station configurations | |
DOOR_STATIONS = { | |
"10.10.2.28": { | |
"name": "Haupteingang", | |
"topic": "frigate/Haupteingang/klingeln/state" | |
}, | |
"10.10.2.27": { | |
"name": "Nebeneingang", | |
"topic": "frigate/Nebeneingang/klingeln/state" | |
} | |
} | |
# MQTT settings | |
broker = 'mqtt' | |
port = 1883 | |
client_id = f'hikvision_sipserver' | |
mqtt_username = 'hik-sip' | |
mqtt_password = 'redacted' | |
# UDP listener settings | |
localIP = "0.0.0.0" # 0.0.0.0 = all interfaces | |
localPort = 5060 | |
bufferSize = 1024 | |
def connect_mqtt(): | |
def on_connect(client, userdata, flags, rc): | |
if rc == 0: | |
print("Connected to MQTT Broker!") | |
else: | |
print("Failed to connect, return code %d\n", rc) | |
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id) | |
client.username_pw_set(mqtt_username, mqtt_password) | |
client.on_connect = on_connect | |
client.connect(broker, port) | |
return client | |
def publish(client, topic, msg): | |
result = client.publish(topic, msg) | |
status = result[0] | |
if status == 0: | |
print(f"Send `{msg}` to topic `{topic}`") | |
else: | |
print(f"Failed to send message to topic {topic}") | |
def handle_doorbell(door_ip): | |
if door_ip not in DOOR_STATIONS: | |
print(f"Unknown door station IP: {door_ip}") | |
return | |
door_config = DOOR_STATIONS[door_ip] | |
door_name = door_config["name"] | |
topic = door_config["topic"] | |
client = connect_mqtt() | |
client.loop_start() | |
t = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") | |
print(f"{t}: Doorbell ring detected at {door_name}") | |
publish(client, topic, "ON") | |
# ignore other udp sip frames for 5 seconds and change mqtt state back after this timeout | |
time.sleep(5) | |
publish(client, topic, "OFF") | |
client.loop_stop() | |
def main(): | |
# Create a datagram socket | |
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) | |
# Bind to address and ip | |
UDPServerSocket.bind((localIP, localPort)) | |
t = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") | |
print(f"{t}: UDP server up and listening on port {localPort}") | |
try: | |
while True: | |
bytesAddressPair = UDPServerSocket.recvfrom(bufferSize) | |
message = bytesAddressPair[0] | |
address = bytesAddressPair[1] | |
client_ip = address[0] | |
decoded_frame = message.decode("utf-8") | |
sipinvite = decoded_frame.startswith('INVITE sip:ManageCenter@') | |
if sipinvite: | |
UDPServerSocket.close() | |
handle_doorbell(client_ip) | |
# re-open udp socket | |
UDPServerSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) | |
UDPServerSocket.bind((localIP, localPort)) | |
except KeyboardInterrupt: | |
print("\nShutting down server...") | |
finally: | |
UDPServerSocket.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment