graph TD
imu(Accel + Gyro<br>over I2C) --> imufetch
imufetch[[imu_fetch_task]] --> datalist
datalist[(Data List<br>with lock)]
datalist --> filewrite[[file_write_task]]
filewrite --> localfile[(local file)]
mainstart{main_task<br>loop start}
--> |wait on<br>record_start_event| mainspawn(SPAWN tasks<br>1. imu_fetch_task<br>2. file_write_task)
mainspawn
--> |wait on<br>record_stop_event| mainstop(STOP tasks<br>using local, shared<br>stop event)
recstart --> mainspawn
recstop --> mainstop
mainstop --> mainstart
btn[Button IRQ<br>with state] --> recstart[(record_start_event<br>ThreadSafeFlag)]
btn --> recstop[(record_stop_event<br>ThreadSafeFlag)]
Last active
March 24, 2022 04:53
-
-
Save standarddeviant/a974b05cc426998c69d110f0609c0fce to your computer and use it in GitHub Desktop.
Micropython IMU recording to internal flash
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from machine import Pin, I2C | |
import network, struct, uos, ubluetooth | |
import uasyncio | |
import ntptime, time | |
import m5stickc | |
import m5stickc_lcd | |
from mpu6886 import MPU6886 | |
# global hardware + state variables | |
lcd = m5stickc_lcd.ST7735() | |
main_button = Pin(37, Pin.IN) | |
sda = Pin(21, Pin.OUT) | |
scl = Pin(22, Pin.OUT) | |
i2c = I2C(0, sda=sda, scl=scl, freq=400000) | |
imu = MPU6886(i2c) | |
led = Pin(10, Pin.OUT) | |
led.value(1) # logic-high = 0 | |
recording = False | |
led_on = False | |
record_start_event = uasyncio.ThreadSafeFlag() | |
record_stop_event = uasyncio.ThreadSafeFlag() | |
# timing constants | |
sensor_period_ms = 50 # 20 | |
led_period_ms = 5000 | |
led_duty_ms = 100 | |
def set_display_before_recording(): | |
m5stickc.lcd_backlight_power(True) | |
lcd.fill(0) | |
init_strings = ('Click', 'button', 'to start', 'recording', '...') | |
for ix, s in enumerate(init_strings): | |
lcd.text(s, 4, 10+(10*ix), 0xffff) | |
lcd.show() | |
led.value(1) | |
def set_display_active_recording(): | |
m5stickc.lcd_backlight_power(False) | |
#lcd.fill(0x03a0); | |
#record_strings = ('recording', 'is', 'active', '...') | |
#for ix, s in enumerate(record_strings): | |
# lcd.text(s, 4, 10+(10*ix), 0xffff) | |
#lcd.show() | |
def record_btn_handler(_pin): | |
global recording | |
if recording: | |
recording = False | |
set_display_before_recording() | |
record_stop_event.set() | |
else: | |
recording = True | |
set_display_active_recording() | |
record_start_event.set() | |
def make_data_file(): | |
try: | |
uos.stat('/data') | |
except OSError: | |
uos.mkdir('/data') | |
fpath = f'/data/{ntptime.time()}.mpk' | |
f = open(fpath, 'wb') | |
return f | |
def make_sensor_msgpack_buf(t, a, g): | |
tba = b'\xce' + bytearray(struct.pack('>I', t)) | |
ax, ay, az = [b'\xca' + bytearray(struct.pack('>f', itm)) for itm in a] | |
gx, gy, gz = [b'\xca' + bytearray(struct.pack('>f', itm)) for itm in g] | |
return b'\x97' + tba + ax + ay + az + gx + gy + gz | |
async def imu_fetch_main(imu, period_ms, out_list, out_lock, stop_event): | |
while True: | |
now = time.ticks_ms() # get current system time | |
a = imu.getAccelData() # fetch accel | |
g = imu.getGyroData() # fetch gyro | |
b = make_sensor_msgpack_buf(now, a, g) # make msgpack bytearray | |
async with out_lock: # acquire (and later release) output data lock | |
out_list.append(b) # append to output data list | |
print('wee') | |
# check if we should stop | |
if stop_event.is_set(): | |
break | |
# sleep to achieve approximate sample period | |
# TODO - make this interrupt driven for tighter timing | |
await uasyncio.sleep_ms(period_ms) | |
# end while True | |
print('imu_fetch_main ~ DONEZO!') | |
# end async imu_fetch_main | |
FILE_FLUSH_MS = 2000 | |
async def file_write_main(in_list, in_lock, stop_event): | |
f = make_data_file() | |
last_flush = time.ticks_ms() | |
while True: | |
b = None | |
async with in_lock: | |
if len(in_list) > 0: | |
b = in_list.pop(0) | |
print('woo') | |
# if we retrieved something form the list, write to file | |
if b is not None: | |
f.write(b) | |
now = time.ticks_ms() | |
if now - last_flush > FILE_FLUSH_MS: | |
f.flush() | |
last_flush = now | |
# check if we should stop | |
if stop_event.is_set(): | |
break | |
await uasyncio.sleep_ms(50) | |
# end while(True) | |
f.close() | |
print('file_write_main ~ DONEZO!') | |
# end async file_write_main | |
async def main_run(): | |
while True: | |
# wait for record_start_event (of type uasyncio.ThreadSafeFlag) | |
await record_start_event.wait() | |
# create uasyncio.Event to signal both tasks to stop | |
local_stop_event = uasyncio.Event() | |
local_stop_event.clear() | |
# start imu fetch task and file write task | |
# async def imu_fetch_main(imu, period_ms, out_list, out_lock, stop_event) | |
# file_write_main(in_list, in_lock, stop_event) | |
imu_fetch_task = uasyncio.create_task(imu_fetch_main(imu, sensor_period_ms, data_list, data_lock, local_stop_event)) | |
file_write_task = uasyncio.create_task(file_write_main(data_list, data_lock, local_stop_event)) | |
# wait for record_stop_event (ThreadSafeFlag) | |
await record_stop_event.wait() | |
# inform two sub-tasks to stop | |
local_stop_event.set() | |
# wait for them to stop | |
uasyncio.gather([imu_fetch_task, file_write_task]) | |
# end while True | |
print('main_run ~ DONEZO (this should not happen...)') | |
# end async def main_run | |
# start main 'setup' | |
import utime | |
f = None | |
last_sensor_fetch_ms = time.ticks_ms() | |
last_led_on_ms = time.ticks_ms() | |
set_display_before_recording() | |
data_list = [] | |
data_lock = uasyncio.Lock() | |
# set the IRQ with the button to communicate via record_start_event and record_stop_event | |
main_button.irq(handler=record_btn_handler, trigger=Pin.IRQ_RISING) #, *, priority=1, wake=None, hard=False) | |
uasyncio.run(main_run()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment