Created
July 7, 2016 10:26
-
-
Save wolfhechel/f3e5ac620f96ddfc26440cddbc307f91 to your computer and use it in GitHub Desktop.
Testing KEvent notifcations with asyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import socket | |
import fcntl | |
import struct | |
import sys | |
SIOCSKEVFILT = 0x800c6502 | |
SYSPROTO_EVENT = 1 | |
KEV_VENDOR_APPLE = 1 | |
KEV_NETWORK_CLASS = 1 | |
KEV_ANY_SUBCLASS = 0 | |
KEV_INET_SUBCLASS = 1 # inet subclass identifier | |
KEV_INET_NEW_ADDR = 1 # Userland configured IP address | |
KEV_INET_CHANGED_ADDR = 2 # Address changed event | |
KEV_INET_ADDR_DELETED = 3 # IPv6 address was deleted | |
KEV_INET6_SUBCLASS = 6 # inet6 subclass identifier | |
KEV_INET6_NEW_USER_ADDR = 1 # Userland configured IPv6 address | |
KEV_INET6_CHANGED_ADDR = 2 # Address changed event (future) | |
KEV_INET6_ADDR_DELETED = 3 # IPv6 address was deleted | |
KEV_INET6_NEW_LL_ADDR = 4 # Autoconf LL address appeared | |
KEV_INET6_NEW_RTADV_ADDR = 5 # Autoconf address has appeared | |
KEV_DL_SUBCLASS = 2 | |
KEV_DL_LINK_OFF = 12 | |
KEV_DL_LINK_ON = 13 | |
kern_event_msg_hdr = struct.Struct('6I') | |
net_event_data = struct.Struct('2I16s') | |
def sysproto_event_socket(event_vendor, event_class, event_subclass): | |
sock = socket.socket(socket.PF_SYSTEM, socket.SOCK_RAW, SYSPROTO_EVENT) | |
fcntl.ioctl(sock, SIOCSKEVFILT, struct.pack('3I', event_vendor, event_class, event_subclass)) | |
return sock | |
class KEventNetworkMonitor(object): | |
_sock = None | |
_loop = None | |
def __init__(self, loop): | |
self._loop = loop | |
self._sock = self._construct_socket() | |
self._loop.create_task(self._start_reading()) | |
def _construct_socket(self): | |
sock = sysproto_event_socket(KEV_VENDOR_APPLE, KEV_NETWORK_CLASS, KEV_ANY_SUBCLASS) | |
sock.setblocking(False) | |
return sock | |
async def _receive_msg(self): | |
buffer = await self._loop.sock_recv(self._sock, 1024) | |
total_size = int.from_bytes(buffer[:4], sys.byteorder) | |
if len(buffer) > total_size: | |
print('discarded data, shit') | |
return buffer[:total_size] | |
async def interface_down(self, interface): | |
print('%s down' % interface) | |
async def interface_up(self, interface): | |
print('%s up' % interface) | |
async def interface_changed(self, interface): | |
print('%s changed' % interface) | |
async def handle_event(self, vendor_code, kev_class, kev_subclass, id, event_code, event_data): | |
method = None | |
if kev_subclass == KEV_DL_SUBCLASS: | |
if event_code == KEV_DL_LINK_ON: | |
method = self.interface_up | |
elif event_code == KEV_DL_LINK_OFF: | |
method = self.interface_down | |
elif kev_subclass == KEV_INET_SUBCLASS: | |
if event_code in (KEV_INET_NEW_ADDR, KEV_INET_CHANGED_ADDR, KEV_INET_ADDR_DELETED): | |
method = self.interface_changed | |
if method: | |
a, b, c = net_event_data.unpack(event_data[:net_event_data.size]) | |
iface = '%s%d' % (c.decode('ascii').strip('\0'), b) | |
await method(iface) | |
async def _start_reading(self): | |
message = await self._receive_msg() | |
total_size, vendor_code, kev_class, kev_subclass, id, event_code = \ | |
kern_event_msg_hdr.unpack(message[:kern_event_msg_hdr.size]) | |
event_data = message[kern_event_msg_hdr.size:] | |
self._loop.create_task(self._start_reading()) | |
await self.handle_event(vendor_code, kev_class, kev_subclass, id, event_code, event_data) | |
def close(self): | |
self._sock.close() | |
loop = asyncio.get_event_loop() | |
monitor = KEventNetworkMonitor(loop) | |
loop.run_forever() | |
monitor.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment