Last active
November 6, 2023 23:59
-
-
Save mk-fg/9bc6d9ff19771dd935ad195c937b5ac7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, network | |
try: import uasyncio as asyncio | |
except ImportError: import asyncio | |
async def wifi_client(ap_base, ap_map): | |
def ssid_str(ssid): | |
try: return ssid.decode() # mpy 1.20 doesn't support errors= handling | |
except UnicodeError: return repr(ssid)[2:-1] # ascii + backslash-escapes | |
p_log = ap_base.get('verbose') and (lambda *a: print('[wifi]', *a)) | |
if cc := ap_base.get('country'): network.country(cc) | |
wifi = network.WLAN(network.STA_IF) | |
wifi.active(True) | |
p_log and p_log('Activated') | |
ap_conn = ap_reconn = addr_last = None | |
ap_keys = [ 'ssid', 'key', 'hostname', 'pm', | |
'channel', 'reconnects', 'txpower', 'mac', 'hidden' ] | |
while True: | |
if not (st := wifi.isconnected()): | |
p_log and p_log( | |
f'Not-connected (reconnect={(ap_reconn or dict()).get("ssid", "no")})' ) | |
ap_conn = None | |
if ap_reconn: # reset same-ssid conn once on hiccups | |
ap_conn, ap_reconn = ap_reconn, None | |
if not ap_conn: | |
ssid_map = dict((ssid_str(ap_info[0]), ap_info[0]) for ap_info in wifi.scan()) | |
p_log and p_log(f'Scan results [ {" // ".join(sorted(ssid_map))} ]') | |
for ssid, ap in ap_map.items(): | |
if ssid_raw := ssid_map.get(ssid): | |
ap_conn = dict(ap_base, ssid=ssid_raw, **ap) | |
break | |
if ap_conn: | |
p_log and p_log(f'Connecting to [ {ssid_str(ap_conn["ssid"])} ]') | |
wifi.config(**dict((k, ap_conn[k]) for k in ap_keys if k in ap_conn)) | |
wifi.connect( ssid=ap_conn['ssid'], | |
key=ap_conn['key'] or None, bssid=ap_conn.get('mac') or None ) | |
elif ap_conn: ap_conn, ap_reconn = None, ap_conn | |
if ap_conn: st, delay = 'connecting', ap_conn['scan_interval'] | |
elif ap_reconn or st: # can also be connection from earlier script-run | |
st, delay = 'connected', (ap_reconn or ap_base)['check_interval'] | |
else: st, delay = 'searching', ap_base['scan_interval'] | |
if addrs := wifi.ifconfig(): | |
if p_log: st += f' [{addrs[0]}]' | |
elif addr_last != addrs[0]: | |
print('[wifi] Current IP Address:', addr_last := addrs[0]) | |
p_log and p_log(f'State = {st}, delay = {delay:.1f}s') | |
await asyncio.sleep(delay) | |
raise RuntimeError('BUG - wifi loop stopped unexpectedly') | |
async def actual_workload_task(): | |
# Placeholder for something you actually want to run on the device | |
for n in range(2**31): | |
await asyncio.sleep(5) | |
print(' <<< beep <<<' if n % 2 == 0 else ' >>> boop >>>') | |
async def main(): | |
wifi_ap_base = dict( # common parameters that apply to all APs in wifi_ap_map | |
# You'd probably want much longer intervals in practice, these are just for quick testing | |
verbose=True, scan_interval=20.0, check_interval=10.0, | |
# country='XX', pm='powersave', hidden=True, hostname='myrp2', ... | |
) | |
wifi_ap_map = { | |
# Some parameters from wifi_ap_base above can be configured per-AP | |
'MySSID': dict(key='trunk yam blaze score boat', hostname='itsme'), | |
'Password is Password': dict(key='Password', check_interval=120), | |
'ᓚᘏᗢ': dict(key='this is a cat'), | |
# ... and any number of other APs to connect to, in first-pick to last-pick order | |
} | |
components = [ | |
wifi_client(wifi_ap_base, wifi_ap_map), | |
actual_workload_task() ] | |
print('----- app started -----') | |
try: await asyncio.gather(*components) | |
finally: print('----- app finished -----') | |
if __name__ == '__main__': asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment