Skip to content

Instantly share code, notes, and snippets.

@mk-fg
Last active November 6, 2023 23:59
Show Gist options
  • Save mk-fg/9bc6d9ff19771dd935ad195c937b5ac7 to your computer and use it in GitHub Desktop.
Save mk-fg/9bc6d9ff19771dd935ad195c937b5ac7 to your computer and use it in GitHub Desktop.
import os, network
try: import uasyncio as asyncio
except ImportError: import asyncio
async def wifi_client(ap_base, ap_map):
def ssid_str(ssid):
try: return ssid.decode() # mpy 1.20 doesn't support errors= handling
except UnicodeError: return repr(ssid)[2:-1] # ascii + backslash-escapes
p_log = ap_base.get('verbose') and (lambda *a: print('[wifi]', *a))
if cc := ap_base.get('country'): network.country(cc)
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
p_log and p_log('Activated')
ap_conn = ap_reconn = addr_last = None
ap_keys = [ 'ssid', 'key', 'hostname', 'pm',
'channel', 'reconnects', 'txpower', 'mac', 'hidden' ]
while True:
if not (st := wifi.isconnected()):
p_log and p_log(
f'Not-connected (reconnect={(ap_reconn or dict()).get("ssid", "no")})' )
ap_conn = None
if ap_reconn: # reset same-ssid conn once on hiccups
ap_conn, ap_reconn = ap_reconn, None
if not ap_conn:
ssid_map = dict((ssid_str(ap_info[0]), ap_info[0]) for ap_info in wifi.scan())
p_log and p_log(f'Scan results [ {" // ".join(sorted(ssid_map))} ]')
for ssid, ap in ap_map.items():
if ssid_raw := ssid_map.get(ssid):
ap_conn = dict(ap_base, ssid=ssid_raw, **ap)
break
if ap_conn:
p_log and p_log(f'Connecting to [ {ssid_str(ap_conn["ssid"])} ]')
wifi.config(**dict((k, ap_conn[k]) for k in ap_keys if k in ap_conn))
wifi.connect( ssid=ap_conn['ssid'],
key=ap_conn['key'] or None, bssid=ap_conn.get('mac') or None )
elif ap_conn: ap_conn, ap_reconn = None, ap_conn
if ap_conn: st, delay = 'connecting', ap_conn['scan_interval']
elif ap_reconn or st: # can also be connection from earlier script-run
st, delay = 'connected', (ap_reconn or ap_base)['check_interval']
else: st, delay = 'searching', ap_base['scan_interval']
if addrs := wifi.ifconfig():
if p_log: st += f' [{addrs[0]}]'
elif addr_last != addrs[0]:
print('[wifi] Current IP Address:', addr_last := addrs[0])
p_log and p_log(f'State = {st}, delay = {delay:.1f}s')
await asyncio.sleep(delay)
raise RuntimeError('BUG - wifi loop stopped unexpectedly')
async def actual_workload_task():
# Placeholder for something you actually want to run on the device
for n in range(2**31):
await asyncio.sleep(5)
print(' <<< beep <<<' if n % 2 == 0 else ' >>> boop >>>')
async def main():
wifi_ap_base = dict( # common parameters that apply to all APs in wifi_ap_map
# You'd probably want much longer intervals in practice, these are just for quick testing
verbose=True, scan_interval=20.0, check_interval=10.0,
# country='XX', pm='powersave', hidden=True, hostname='myrp2', ...
)
wifi_ap_map = {
# Some parameters from wifi_ap_base above can be configured per-AP
'MySSID': dict(key='trunk yam blaze score boat', hostname='itsme'),
'Password is Password': dict(key='Password', check_interval=120),
'ᓚᘏᗢ': dict(key='this is a cat'),
# ... and any number of other APs to connect to, in first-pick to last-pick order
}
components = [
wifi_client(wifi_ap_base, wifi_ap_map),
actual_workload_task() ]
print('----- app started -----')
try: await asyncio.gather(*components)
finally: print('----- app finished -----')
if __name__ == '__main__': asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment