Last active
November 12, 2021 00:21
-
-
Save anecdata/98ee740b7370dd5bedfb2c88d75ee74f to your computer and use it in GitHub Desktop.
CircuitPython early monitor demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gc | |
import time | |
import board | |
import digitalio | |
import supervisor | |
import ipaddress | |
import wifi | |
import socketpool | |
import ssl | |
import espidf | |
PARSE = True # parse packet header fields, and SSID from packet body | |
DEMO = False # when True, redact SSID & lower 3 bytes of MACs | |
MAXBUF = 58 # 24 header + 1 IE ID + 1 IE len + 32 max SSID | |
def check_type(mac): | |
# determine MAC type | |
mactype = "" | |
try: | |
# mac_int = int('0x' + mac[1], base=16) # not supported in CP | |
mac_int = int("".join(("0x", mac[0:2]))) | |
if (mac_int & 0b0011) == 0b0011: # 3,7,B,F LOCAL MULTICAST | |
mactype = "L_M" | |
elif (mac_int & 0b0010) == 0b0010: # 2,3,6,7,A,B,E,F LOCAL | |
mactype = "LOC" | |
elif (mac_int & 0b0001) == 0b0001: # 1,3,5,7,9,B,D,F MULTICAST | |
mactype = "MUL" | |
else: # 0,4,8,C VENDOR (or unassigned) | |
mactype = "VEN" | |
except (ValueError, IndexError) as e: | |
pass | |
return mactype | |
def parse_frame(buf, buf_len): | |
fd = {} | |
type_names = ("mgmt", "ctrl", "data", "extn") | |
subt_names = ("Assoc Req", "Assoc Resp", "ReAssoc Req", "ReAssoc Resp", | |
"Probe Req", "Probe Resp", "Timing", "Reserved_7", | |
"Beacon", "ATIM", "DisAssoc", "Auth", | |
"DeAuth", "Action", "ActionN", "Reserved_F",) | |
fixed = (4,6,10,6, | |
0,12,0,0, | |
12,0,2,6, | |
2,0,0,0) | |
fd["ch"] = buf[2] | |
if buf[3] > 127: | |
fd["rssi"] = (256-buf[3]) * (-1) | |
else: | |
fd["rssi"] = buf[3] | |
fd["type"] = (buf[0] & 0b00001100) >> 2 | |
fd["typename"] = type_names[fd["type"]] | |
fd["subt"] = (buf[0] & 0b11110000) >> 4 | |
fd["subtname"] = subt_names[fd["subt"]] | |
if not DEMO: | |
fd["a1"] = ":".join("%02X" % _ for _ in buf[4:10]) | |
fd["a2"] = ":".join("%02X" % _ for _ in buf[10:16]) | |
fd["a3"] = ":".join("%02X" % _ for _ in buf[16:22]) | |
else: | |
fd["a1"] = ":".join("%02X" % _ for _ in buf[4:7]) + ":RE:DA:CT" | |
fd["a2"] = ":".join("%02X" % _ for _ in buf[10:13]) + ":RE:DA:CT" | |
fd["a3"] = ":".join("%02X" % _ for _ in buf[16:19]) + ":RE:DA:CT" | |
fd["a1_type"] = check_type(fd["a1"]) | |
fd["a2_type"] = check_type(fd["a2"]) | |
fd["a3_type"] = check_type(fd["a3"]) | |
fd["seq"] = ((buf[22] & 0b00001111) << 8) + buf[23] | |
fd["frag"] = (buf[22] & 0b11110000) >> 4 | |
fd["ssid"] = "" | |
if fd["subt"] in (1, 4, 5, 8): | |
try: | |
ieid = buf[24 + fixed[fd["subt"]]] | |
ielen = buf[24 + fixed[fd["subt"]] + 1] | |
iendx = 24 + fixed[fd["subt"]] + 2 | |
if (ieid == 0) and (ielen > 0): | |
ssid = "" | |
for _ in range(iendx, iendx+ielen): | |
ssid = ssid + chr(buf[_]) | |
# print("DEBUG", ieid, ielen, iendx, ssid) | |
if not DEMO: | |
fd["ssid"] = "IE ID " + str(ieid) + " IE LEN " + str(ielen) + " " + ssid | |
else: | |
fd["ssid"] = "SSID found" | |
except IndexError as e: | |
print("IndexError", e) | |
return fd | |
print("1") | |
time.sleep(1) | |
print("2") | |
time.sleep(1) | |
print("3") | |
led = digitalio.DigitalInOut(board.LED) | |
led.switch_to_output(True) # Cucumber amber LED is active-low | |
while True: | |
try: | |
for network in wifi.radio.start_scanning_networks(): | |
print("{0:02X}:{1:02X}:{2:02X}:{3:02X}:{4:02X}:{5:02X}".format(*network.bssid), end=" ") | |
print("{:>2d}".format(network.channel), end=" ") | |
print(network.rssi, network.authmode, network.country, network.ssid) | |
wifi.radio.stop_scanning_networks() | |
break; | |
except (RuntimeError) as e: | |
print("WiFi Scan:", e) | |
print("-"*49) | |
buf = bytearray(MAXBUF) | |
loop = 1 | |
while True: | |
gc.collect() | |
wifi.radio.set_monitor_enabled(True) | |
# for _ in range(0, 100): | |
while True: | |
led.value = False | |
received = wifi.radio.monitor_recv_into(buf, MAXBUF) | |
led.value = True | |
if received > 0: | |
print(f'{loop: 6d} {(time.monotonic_ns() / 1_000_000_000): 9.3f}s', end=" ") | |
if PARSE: | |
fd = parse_frame(buf, received) | |
print(f'{received:02d}', end=" ") | |
print(f'{fd["ch"]:02d} {fd["rssi"]:+2d} {fd["typename"]}', end=" ") | |
print(f'{fd["a1"]} {fd["a1_type"]} {fd["a2"]} {fd["a2_type"]} {fd["a3"]} {fd["a3_type"]}', end=" ") | |
print(f'{fd["seq"]:03d} {fd["frag"]:02d} {fd["subtname"]}', end=" ") | |
print(f'{fd["ssid"]}', end=" ") | |
else: | |
print(" ".join("%02X" % _ for _ in buf[0:4]), end=" ") | |
if loop % 100 == 1: | |
print(gc.mem_free(), espidf.heap_caps_get_total_size(), espidf.heap_caps_get_free_size(), espidf.heap_caps_get_largest_free_block(), end=" ") | |
print() | |
loop += 1 | |
wifi.radio.set_monitor_enabled(False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This was test code for a prior API for Monitor mode.