Skip to content

Instantly share code, notes, and snippets.

@tiagocoutinho
Last active July 26, 2021 04:56
Show Gist options
  • Save tiagocoutinho/ca768c34d32e055613d5e2e3891026e4 to your computer and use it in GitHub Desktop.
Save tiagocoutinho/ca768c34d32e055613d5e2e3891026e4 to your computer and use it in GitHub Desktop.
gevent & asyncio serial line on posix systems
import asyncio
import serial_asyncio
I=0
async def loop():
global I
while True:
await asyncio.sleep(0.1)
I += 1
async def iread(reader, end=b'\n'):
c = None
while c != end:
c = await reader.readexactly(1)
yield c
async def main():
task = asyncio.create_task(loop())
reader, writer = await serial_asyncio.open_serial_connection(
url='/tmp/roadrunner', baudrate=115200)
writer.write(b'*IDN?\n')
# could have used simply: print(reader.readuntil())
# ...but I wanted to see the stream coming in like it's 1980 and I am
# watching War Games
print('incoming reply: ', end='', flush=True)
async for c in iread(reader):
print(c.decode(), end='', flush=True)
task.cancel()
global I
print('Done at {}!'.format(I))
asyncio.run(main())
# Now using low level file descriptor
import gevent.os
I=0
def loop():
global I
for i in range(10000):
gevent.sleep(0.1)
I += 1
def print_idn(fd):
gevent.os.nb_write(fd, b'*IDN?\n')
c = None
while c != b'\n':
c = gevent.os.nb_read(fd, 1)
print(c.decode(), end='', flush=True)
flags = gevent.os.O_RDWR | gevent.os.O_NOCTTY | gevent.os.O_NONBLOCK
fd = gevent.os.open('/tmp/roadrunner', flags)
gevent.os.make_nonblocking(fd) # just sets O_NONBLOCK flag on the file descriptor
gevent.spawn(loop)
print_idn(fd)
print('Done at {}!'.format(I))
# Now using high level FileObject
import io
import os
import gevent.fileobject
I=0
def loop():
global I
for i in range(10000):
gevent.sleep(0.1)
I += 1
def print_idn(f):
f.write(b'*IDN?\n')
buff = b''
while b'\n' not in buff:
c = f.read(1)
buff += c
print(c.decode(), end='', flush=True)
flags = os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK
fd = os.open('/tmp/roadrunner', flags)
reader = gevent.fileobject.FileObject(fd, 'rb', bufsize=0)
writer = gevent.fileobject.FileObject(fd, 'wb', bufsize=0)
f = io.BufferedRWPair(reader, writer, 1)
gevent.spawn(loop)
print_idn(f)
print('Done at {}!'.format(I))
import os
import pty
import tty
import time
name = '/tmp/roadrunner'
if os.path.islink(name):
os.unlink(name)
master, slave = pty.openpty()
tty.setraw(master)
port = os.ttyname(slave)
os.symlink(port, name)
def handle():
while True:
data = os.read(master, 1024)
datau = data.upper().strip()
print('processing {!r}'.format(data))
if datau == b'*IDN?':
msg = 'RoadRunner, ACME Inc., v1013, 234567\n'
else:
msg = 'ERROR: Unknown command\n'
print('start reply with {!r}... '.format(msg), end='', flush=True)
for c in msg:
os.write(master, c.encode())
time.sleep(0.05)
print('[DONE]')
print('Ready to accept requests on {}'.format(name))
try:
handle()
except KeyboardInterrupt:
print('Ctrl-C pressed. Bailing out')
finally:
os.unlink(name)

Usage

on one console type

$ python scpi_serial_server.py

this creates a simulated serial line. the address is /tmp/roadrunner

on another console execute one of the two python gevent examples:

$ python gevent_serial_filedescriptor.py
RoadRunner, ACME Inc., v1013, 234567
Done at 18!

The important thing to notice on both examples is that the output Done at <X>! is bigger than 0 infering that the other gevent task was able to run concurrently with the slow read of the simulated serial line.

Side note

See also this great post on serial line with python asyncio

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment