Skip to content

Instantly share code, notes, and snippets.

@anecdata
Last active November 12, 2021 00:21
Show Gist options
  • Save anecdata/98ee740b7370dd5bedfb2c88d75ee74f to your computer and use it in GitHub Desktop.
Save anecdata/98ee740b7370dd5bedfb2c88d75ee74f to your computer and use it in GitHub Desktop.
CircuitPython early monitor demo
import gc
import time
import board
import digitalio
import supervisor
import ipaddress
import wifi
import socketpool
import ssl
import espidf
PARSE = True # parse packet header fields, and SSID from packet body
DEMO = False # when True, redact SSID & lower 3 bytes of MACs
MAXBUF = 58 # 24 header + 1 IE ID + 1 IE len + 32 max SSID
def check_type(mac):
# determine MAC type
mactype = ""
try:
# mac_int = int('0x' + mac[1], base=16) # not supported in CP
mac_int = int("".join(("0x", mac[0:2])))
if (mac_int & 0b0011) == 0b0011: # 3,7,B,F LOCAL MULTICAST
mactype = "L_M"
elif (mac_int & 0b0010) == 0b0010: # 2,3,6,7,A,B,E,F LOCAL
mactype = "LOC"
elif (mac_int & 0b0001) == 0b0001: # 1,3,5,7,9,B,D,F MULTICAST
mactype = "MUL"
else: # 0,4,8,C VENDOR (or unassigned)
mactype = "VEN"
except (ValueError, IndexError) as e:
pass
return mactype
def parse_frame(buf, buf_len):
fd = {}
type_names = ("mgmt", "ctrl", "data", "extn")
subt_names = ("Assoc Req", "Assoc Resp", "ReAssoc Req", "ReAssoc Resp",
"Probe Req", "Probe Resp", "Timing", "Reserved_7",
"Beacon", "ATIM", "DisAssoc", "Auth",
"DeAuth", "Action", "ActionN", "Reserved_F",)
fixed = (4,6,10,6,
0,12,0,0,
12,0,2,6,
2,0,0,0)
fd["ch"] = buf[2]
if buf[3] > 127:
fd["rssi"] = (256-buf[3]) * (-1)
else:
fd["rssi"] = buf[3]
fd["type"] = (buf[0] & 0b00001100) >> 2
fd["typename"] = type_names[fd["type"]]
fd["subt"] = (buf[0] & 0b11110000) >> 4
fd["subtname"] = subt_names[fd["subt"]]
if not DEMO:
fd["a1"] = ":".join("%02X" % _ for _ in buf[4:10])
fd["a2"] = ":".join("%02X" % _ for _ in buf[10:16])
fd["a3"] = ":".join("%02X" % _ for _ in buf[16:22])
else:
fd["a1"] = ":".join("%02X" % _ for _ in buf[4:7]) + ":RE:DA:CT"
fd["a2"] = ":".join("%02X" % _ for _ in buf[10:13]) + ":RE:DA:CT"
fd["a3"] = ":".join("%02X" % _ for _ in buf[16:19]) + ":RE:DA:CT"
fd["a1_type"] = check_type(fd["a1"])
fd["a2_type"] = check_type(fd["a2"])
fd["a3_type"] = check_type(fd["a3"])
fd["seq"] = ((buf[22] & 0b00001111) << 8) + buf[23]
fd["frag"] = (buf[22] & 0b11110000) >> 4
fd["ssid"] = ""
if fd["subt"] in (1, 4, 5, 8):
try:
ieid = buf[24 + fixed[fd["subt"]]]
ielen = buf[24 + fixed[fd["subt"]] + 1]
iendx = 24 + fixed[fd["subt"]] + 2
if (ieid == 0) and (ielen > 0):
ssid = ""
for _ in range(iendx, iendx+ielen):
ssid = ssid + chr(buf[_])
# print("DEBUG", ieid, ielen, iendx, ssid)
if not DEMO:
fd["ssid"] = "IE ID " + str(ieid) + " IE LEN " + str(ielen) + " " + ssid
else:
fd["ssid"] = "SSID found"
except IndexError as e:
print("IndexError", e)
return fd
print("1")
time.sleep(1)
print("2")
time.sleep(1)
print("3")
led = digitalio.DigitalInOut(board.LED)
led.switch_to_output(True) # Cucumber amber LED is active-low
while True:
try:
for network in wifi.radio.start_scanning_networks():
print("{0:02X}:{1:02X}:{2:02X}:{3:02X}:{4:02X}:{5:02X}".format(*network.bssid), end=" ")
print("{:>2d}".format(network.channel), end=" ")
print(network.rssi, network.authmode, network.country, network.ssid)
wifi.radio.stop_scanning_networks()
break;
except (RuntimeError) as e:
print("WiFi Scan:", e)
print("-"*49)
buf = bytearray(MAXBUF)
loop = 1
while True:
gc.collect()
wifi.radio.set_monitor_enabled(True)
# for _ in range(0, 100):
while True:
led.value = False
received = wifi.radio.monitor_recv_into(buf, MAXBUF)
led.value = True
if received > 0:
print(f'{loop: 6d} {(time.monotonic_ns() / 1_000_000_000): 9.3f}s', end=" ")
if PARSE:
fd = parse_frame(buf, received)
print(f'{received:02d}', end=" ")
print(f'{fd["ch"]:02d} {fd["rssi"]:+2d} {fd["typename"]}', end=" ")
print(f'{fd["a1"]} {fd["a1_type"]} {fd["a2"]} {fd["a2_type"]} {fd["a3"]} {fd["a3_type"]}', end=" ")
print(f'{fd["seq"]:03d} {fd["frag"]:02d} {fd["subtname"]}', end=" ")
print(f'{fd["ssid"]}', end=" ")
else:
print(" ".join("%02X" % _ for _ in buf[0:4]), end=" ")
if loop % 100 == 1:
print(gc.mem_free(), espidf.heap_caps_get_total_size(), espidf.heap_caps_get_free_size(), espidf.heap_caps_get_largest_free_block(), end=" ")
print()
loop += 1
wifi.radio.set_monitor_enabled(False)
@anecdata
Copy link
Author

This was test code for a prior API for Monitor mode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment