Last active
February 25, 2021 00:47
-
-
Save elupus/d52ef3f7c3f0706638d7beba5e7ab58a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from asyncio.events import AbstractEventLoop | |
import logging | |
import socket | |
import struct | |
from typing import Optional, Tuple | |
USER_AGENT = "HassTest" | |
def create_listen_socket(addrinfo_iface, addrinfo_mcast): | |
sock = socket.socket(addrinfo_mcast[0], socket.SOCK_DGRAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
group_bin = socket.inet_pton(addrinfo_mcast[0], addrinfo_mcast[4][0]) | |
iface_bin = socket.inet_pton(addrinfo_iface[0], addrinfo_iface[4][0]) | |
print(addrinfo_mcast[4]) | |
print(addrinfo_iface[4]) | |
bind_address = (addrinfo_mcast[4][0], *addrinfo_iface[4][1:]) | |
print(bind_address) | |
sock.bind(bind_address) | |
if addrinfo_mcast[0] == socket.AF_INET: # IPv4 | |
mreq = group_bin + iface_bin | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) | |
else: | |
ifn = struct.pack("@I", addrinfo_iface[4][3]) | |
mreq = group_bin + ifn | |
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) | |
return sock | |
def create_search_socket(addrinfo): | |
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM) | |
if addrinfo[0] == socket.AF_INET6: | |
sock.setsockopt(socket.IPPROTO_IPV6, socket.IP_MULTICAST_TTL, 2) | |
#sock.connect(addrinfo[4]) | |
return sock | |
class SsdpProtocol(asyncio.DatagramProtocol): | |
def __init__(self, addrinfo): | |
self.addrinfo = addrinfo | |
self.transport = None | |
def connection_made(self, transport): | |
self.transport = transport | |
def datagram_received(self, data, addr): | |
print('Received data from {!r}\n{}\n\n'.format(addr, data.decode("utf8"))) | |
def error_received(self, exc): | |
print('Error %s', exc) | |
def close(self): | |
print('Closed') | |
self.transport.close() | |
def msearch(self, st, mx: Optional[int] = None, host: Optional[str] = None, address: Optional[Tuple[str, int]] = None): | |
data = [ | |
f'M-SEARCH * HTTP/1.1', | |
f'ST: {st}', | |
f'MAN: "ssdp:discover"', | |
f'USER-AGENT: {USER_AGENT}' | |
] | |
if mx is not None: | |
data.append(f'MX: {mx:d}') | |
if host is None: | |
if self.addrinfo[0] == socket.AF_INET6: | |
host = f"[{self.addrinfo[4][0]}]:{self.addrinfo[4][1]}" | |
else: | |
host = f"{self.addrinfo[4][0]}:{self.addrinfo[4][1]}" | |
data.append(f'HOST: {host}') | |
data.append('') | |
data.append('') | |
print(data) | |
if address is None: | |
address = self.addrinfo[4] | |
print(address) | |
self.transport.sendto("\r\n".join(data).encode('utf-8'), address) | |
@staticmethod | |
async def create_searcher(loop: AbstractEventLoop, remote: Tuple[str, int]): | |
addrinfo = socket.getaddrinfo(remote[0], remote[1], type=socket.SOCK_DGRAM) | |
print(addrinfo) | |
sock = create_search_socket(addrinfo[0]) | |
_, protocol = await loop.create_datagram_endpoint( | |
lambda: SsdpProtocol(addrinfo[0]), | |
sock=sock, | |
) | |
return protocol | |
@staticmethod | |
async def create_listener(loop: AbstractEventLoop, local: Tuple[str, int], remote: Tuple[str, int]): | |
addrinfo_mcast = socket.getaddrinfo(remote[0], remote[1], flags=socket.AI_PASSIVE, type=socket.SOCK_DGRAM) | |
addrinfo_iface = socket.getaddrinfo(local[0], local[0], family=addrinfo_mcast[0][0], flags=socket.AI_PASSIVE, type=socket.SOCK_DGRAM) | |
sock = create_listen_socket(addrinfo_iface[0], addrinfo_mcast[0]) | |
_, protocol = await loop.create_datagram_endpoint( | |
lambda: SsdpProtocol(addrinfo_mcast[0]), | |
sock=sock, | |
) | |
return protocol | |
async def listener(loop): | |
#await SsdpProtocol.create_listener(loop, ("localhost", 1900), ("239.255.255.250", 1900)) | |
#protocol = await SsdpProtocol.create_listener(loop, ("192.168.16.116", 1900), ("239.255.255.250", 1900)) | |
#protocol.msearch("ssdp:all", 0, "239.255.255.250:1900") | |
#await SsdpProtocol.create_listener(loop, ("::1%en0", 1900), "FF02::C") | |
pass | |
async def searcher(loop): | |
protocol = await SsdpProtocol.create_searcher(loop, ("239.255.255.250", 1900)) | |
#protocol = await SsdpProtocol.create_searcher(loop, ("FF02::C", 1900)) | |
#protocol = await SsdpProtocol.create_searcher(loop, ("255.255.255.255", 1900)) | |
protocol.msearch("ssdp:all", 0, address=("192.168.16.36", 1900)) | |
#protocol.msearch("ssdp:all", 0, address=("FF02::C", 1900, 0, 1)) | |
#protocol.msearch("ssdp:all", 5) | |
#protocol.msearch("upnp:rootdevice", 2, "239.255.255.250:1900") | |
return protocol | |
def run(): | |
loop = asyncio.get_event_loop() | |
loop.set_debug(True) | |
logging.basicConfig(level=logging.DEBUG) | |
#protocol = loop.run_until_complete(listener(loop)) | |
protocol = loop.run_until_complete(searcher(loop)) | |
loop.run_forever() | |
loop.close() | |
if __name__ == "__main__": | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment