Skip to content

Instantly share code, notes, and snippets.

@standarddeviant
Last active March 24, 2022 04:53
Show Gist options
  • Save standarddeviant/a974b05cc426998c69d110f0609c0fce to your computer and use it in GitHub Desktop.
Save standarddeviant/a974b05cc426998c69d110f0609c0fce to your computer and use it in GitHub Desktop.
Micropython IMU recording to internal flash
graph TD
    imu(Accel + Gyro<br>over I2C) --> imufetch
    imufetch[[imu_fetch_task]] --> datalist
    datalist[(Data List<br>with lock)]
    datalist --> filewrite[[file_write_task]]
    filewrite --> localfile[(local file)]

    mainstart{main_task<br>loop start}
    --> |wait on<br>record_start_event| mainspawn(SPAWN tasks<br>1. imu_fetch_task<br>2. file_write_task)
    mainspawn
    --> |wait on<br>record_stop_event| mainstop(STOP tasks<br>using local, shared<br>stop event)
    recstart --> mainspawn
    recstop --> mainstop
    mainstop --> mainstart

    btn[Button IRQ<br>with state] --> recstart[(record_start_event<br>ThreadSafeFlag)]
    btn --> recstop[(record_stop_event<br>ThreadSafeFlag)]
Loading
from machine import Pin, I2C
import network, struct, uos, ubluetooth
import uasyncio
import ntptime, time
import m5stickc
import m5stickc_lcd
from mpu6886 import MPU6886
# global hardware + state variables
lcd = m5stickc_lcd.ST7735()
main_button = Pin(37, Pin.IN)
sda = Pin(21, Pin.OUT)
scl = Pin(22, Pin.OUT)
i2c = I2C(0, sda=sda, scl=scl, freq=400000)
imu = MPU6886(i2c)
led = Pin(10, Pin.OUT)
led.value(1) # logic-high = 0
recording = False
led_on = False
record_start_event = uasyncio.ThreadSafeFlag()
record_stop_event = uasyncio.ThreadSafeFlag()
# timing constants
sensor_period_ms = 50 # 20
led_period_ms = 5000
led_duty_ms = 100
def set_display_before_recording():
m5stickc.lcd_backlight_power(True)
lcd.fill(0)
init_strings = ('Click', 'button', 'to start', 'recording', '...')
for ix, s in enumerate(init_strings):
lcd.text(s, 4, 10+(10*ix), 0xffff)
lcd.show()
led.value(1)
def set_display_active_recording():
m5stickc.lcd_backlight_power(False)
#lcd.fill(0x03a0);
#record_strings = ('recording', 'is', 'active', '...')
#for ix, s in enumerate(record_strings):
# lcd.text(s, 4, 10+(10*ix), 0xffff)
#lcd.show()
def record_btn_handler(_pin):
global recording
if recording:
recording = False
set_display_before_recording()
record_stop_event.set()
else:
recording = True
set_display_active_recording()
record_start_event.set()
def make_data_file():
try:
uos.stat('/data')
except OSError:
uos.mkdir('/data')
fpath = f'/data/{ntptime.time()}.mpk'
f = open(fpath, 'wb')
return f
def make_sensor_msgpack_buf(t, a, g):
tba = b'\xce' + bytearray(struct.pack('>I', t))
ax, ay, az = [b'\xca' + bytearray(struct.pack('>f', itm)) for itm in a]
gx, gy, gz = [b'\xca' + bytearray(struct.pack('>f', itm)) for itm in g]
return b'\x97' + tba + ax + ay + az + gx + gy + gz
async def imu_fetch_main(imu, period_ms, out_list, out_lock, stop_event):
while True:
now = time.ticks_ms() # get current system time
a = imu.getAccelData() # fetch accel
g = imu.getGyroData() # fetch gyro
b = make_sensor_msgpack_buf(now, a, g) # make msgpack bytearray
async with out_lock: # acquire (and later release) output data lock
out_list.append(b) # append to output data list
print('wee')
# check if we should stop
if stop_event.is_set():
break
# sleep to achieve approximate sample period
# TODO - make this interrupt driven for tighter timing
await uasyncio.sleep_ms(period_ms)
# end while True
print('imu_fetch_main ~ DONEZO!')
# end async imu_fetch_main
FILE_FLUSH_MS = 2000
async def file_write_main(in_list, in_lock, stop_event):
f = make_data_file()
last_flush = time.ticks_ms()
while True:
b = None
async with in_lock:
if len(in_list) > 0:
b = in_list.pop(0)
print('woo')
# if we retrieved something form the list, write to file
if b is not None:
f.write(b)
now = time.ticks_ms()
if now - last_flush > FILE_FLUSH_MS:
f.flush()
last_flush = now
# check if we should stop
if stop_event.is_set():
break
await uasyncio.sleep_ms(50)
# end while(True)
f.close()
print('file_write_main ~ DONEZO!')
# end async file_write_main
async def main_run():
while True:
# wait for record_start_event (of type uasyncio.ThreadSafeFlag)
await record_start_event.wait()
# create uasyncio.Event to signal both tasks to stop
local_stop_event = uasyncio.Event()
local_stop_event.clear()
# start imu fetch task and file write task
# async def imu_fetch_main(imu, period_ms, out_list, out_lock, stop_event)
# file_write_main(in_list, in_lock, stop_event)
imu_fetch_task = uasyncio.create_task(imu_fetch_main(imu, sensor_period_ms, data_list, data_lock, local_stop_event))
file_write_task = uasyncio.create_task(file_write_main(data_list, data_lock, local_stop_event))
# wait for record_stop_event (ThreadSafeFlag)
await record_stop_event.wait()
# inform two sub-tasks to stop
local_stop_event.set()
# wait for them to stop
uasyncio.gather([imu_fetch_task, file_write_task])
# end while True
print('main_run ~ DONEZO (this should not happen...)')
# end async def main_run
# start main 'setup'
import utime
f = None
last_sensor_fetch_ms = time.ticks_ms()
last_led_on_ms = time.ticks_ms()
set_display_before_recording()
data_list = []
data_lock = uasyncio.Lock()
# set the IRQ with the button to communicate via record_start_event and record_stop_event
main_button.irq(handler=record_btn_handler, trigger=Pin.IRQ_RISING) #, *, priority=1, wake=None, hard=False)
uasyncio.run(main_run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment