Skip to content

Instantly share code, notes, and snippets.

@anecdata
Last active September 11, 2025 19:33
Show Gist options
  • Save anecdata/d65c5040bee6d4d99c666c8d1a0e9ad3 to your computer and use it in GitHub Desktop.
Save anecdata/d65c5040bee6d4d99c666c8d1a0e9ad3 to your computer and use it in GitHub Desktop.
CircuitPython asyncio on RP2350B with 2 busio UARTS + 6 PIO UARTs
# SPDX-FileCopyrightText: 2025 anecdata
# SPDX-License-Identifier: MIT
import time
import random
import storage
import board
import busio
import asyncio
import adafruit_pio_uart
# RP2xx0 UART test
# Pimoroni pico lipo 2XL W w/ Pimoroni PGA2350 firmware
# busio sw buffer 8 bytes
# busio default receiver_buffer_size 64 bytes (max 65535 bytes)
# raspberrypi hw uart rx buffer 32 bytes
# PIO uart rx buffer 8 bytes
MAXBUF = 8
MIN_INTERVAL = 2.5
label = storage.getmount("/").label
def write(t, u):
m = bytearray()
# construct MAXBUF-byte chunk (≥8)
m.extend(str(t)) # 1-byte UART number
m.extend(label[2:5]) # 3-bytes from unique CIRCUITPY re-name
m.extend(f"{str(round(time.monotonic()) % 10_000):0>4}") # 4-byte monotonic time
m.extend(f"{'x' * (MAXBUF - 8)}")
print(f"{time.monotonic():.3f}s uart={t} writing {bytes(m)}", end=" ")
written = u.write(m)
print(f"{time.monotonic():.3f}s uart={t} {written=} {bytes(m)}")
async def awrite(t, u):
print(f"WRITE uart={t} started")
while True:
write(t, u)
await asyncio.sleep(MIN_INTERVAL + random.random())
def read(t, u):
print(f"{time.monotonic():.3f}s uart={t} {u.in_waiting=}", end=" ")
m = u.read(MAXBUF)
print(f"{time.monotonic():.3f}s uart={t} read {bytes(m)}")
async def aread(t, u):
print(f"READ uart={t} started")
while True:
if u.in_waiting:
read(t, u)
await asyncio.sleep(0)
async def main():
# RP2350B non-wifi has enough state machines & pins for:
# 6 PIO UARTs with flow control
# and
# 2 busio UARTs with [optional] flow control
uarts = []
# PIO UART
# RP2040: 8 PIO state machines; 3.5 UARTs w/ wifi, 4 UARTs w/o wifi UF2
# RP2350: 12 PIO state machines; 5.5 UARTs w/ wifi, 6 UARTs w/o wifi UF2
# use hardware flow control b/c PIO buffers are very small
uarts.append(adafruit_pio_uart.UART(board.GP0, board.GP1, 9600, 8, None, 1, 1, board.GP2, board.GP3))
uarts.append(adafruit_pio_uart.UART(board.GP4, board.GP5, 9600, 8, None, 1, 1, board.GP6, board.GP7))
uarts.append(adafruit_pio_uart.UART(board.GP8, board.GP9, 9600, 8, None, 1, 1, board.GP10, board.GP11))
uarts.append(adafruit_pio_uart.UART(board.GP12, board.GP13, 9600, 8, None, 1, 1, board.GP14, board.GP15))
uarts.append(adafruit_pio_uart.UART(board.GP16, board.GP17, 9600, 8, None, 1, 1, board.GP18, board.GP19))
uarts.append(adafruit_pio_uart.UART(board.GP20, board.GP21, 9600, 8, None, 1, 1, board.GP26, board.GP27))
# GP >=32: ValueError: program[6] waits on input outside of count
# https://github.com/adafruit/circuitpython/issues/10619
# busio UART
# 2 UART peripherals; hardware flow control not typically needed because of software receiver buffer
# peripheral 0
uarts.append(busio.UART(board.GP32, board.GP33)) # cts=board.GP34, rts=board.GP35, receiver_buffer_size=MAXBUF))
# peripheral 1
uarts.append(busio.UART(board.GP36, board.GP37)) # rts=board.GP38, rts=board.GP39, receiver_buffer_size=MAXBUF))
tasks = []
for taskno in range(len(uarts)):
uarts[taskno].reset_input_buffer()
tasks.append(asyncio.create_task(aread(taskno, uarts[taskno])))
tasks.append(asyncio.create_task(awrite(taskno, uarts[taskno])))
print(f"created {len(tasks)} tasks")
while True:
await asyncio.sleep(0)
time.sleep(3) # wait for serial after reset
asyncio.run(main())
@anecdata
Copy link
Author

anecdata commented Sep 11, 2025

Choosing pins for PIO can be challenging. Each PIO block (3 in RP2350) has 4 state machines. A bi-directional UART uses 2 state machines. Each PIO block can connect to a set of 30 GPIO, which must be within a GPIO 'bank' of 32 consecutive pins. So with multiple UARTs, any 2 UARTs in the same PIO block (presumably init'd sequentially) should use pins that are relatively close together (not in different 'banks'. See: adafruit/circuitpython#10627

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment