Here we share a global data bus which allows to remove the explicit input and output arguments for current parts when added to the Vehicle
. Each part runs in its own thread with its own frequency. Parts can write to and read from the data bus asynchronously and there is no global vehicle loop any longer. There is some resemblance to ROS whereby parts subscribe to topics.
There is currently no control about type safety or allowed types in the data bus but that could be added easily.
Last active
February 6, 2023 00:50
-
-
Save DocGarbanzo/f80c0ce9a8984d9f5f8e857ffd1cb244 to your computer and use it in GitHub Desktop.
Mock up of an alternative parts and car setup for donkeycar.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import time | |
import collections | |
from threading import Thread | |
from threading import Lock | |
# Not really needed, but a structure to combine data, its type and a timestamp | |
DataStruct = collections.namedtuple('DataStruct', 'data_type data time_stamp') | |
class DataBus: | |
""" Single object in the car that shares all data between parts""" | |
def __init__(self): | |
self.data_store = {} | |
def write(self, data_name, data_type, data): | |
""" Write data into the bus""" | |
d = DataStruct(data_type=data_type, data=data, time_stamp=time.time()) | |
# here we check types but could check more like types don't change, etc | |
assert type(data) is data_type, f'{type(data).__name__} does not ' \ | |
f'match {data_type.__name__}' | |
# we just replace data, don't keep history, but we could also keep | |
# the last n entries and tag it with an additional counter which gets | |
# cycled | |
self.data_store[data_name] = d | |
def read(self, data_name): | |
""" Return current data entry, return None when nothing found but don't throw """ | |
return self.data_store.get(data_name) | |
class Part(object): | |
""" Part base class, provides asynchronous threads with individual loop | |
frequencies """ | |
def __init__(self, loop_time): | |
self.data_bus = None | |
self.loop_time = loop_time | |
self.t = Thread(target=self.update, args=()) | |
self.t.daemon = True | |
self.lock = Lock() | |
self.last_time = time.time() | |
self.loop_count = 0 | |
self.time_diff_total = 0. | |
print('Created part', type(self).__name__, 'with loop time', | |
self.loop_time) | |
def update(self): | |
""" Only needs to be implemented here """ | |
assert self.data_bus, "Need to set data bus first" | |
while True: | |
# lock share resource | |
with self.lock: | |
self.read_from_bus() | |
# lock shared resource | |
with self.lock: | |
self.write_to_bus() | |
now = time.time() | |
time_diff = now - self.last_time | |
# mechanically delay loop to match expected loop time - this is | |
# not exact but approximate | |
if time_diff < self.loop_time: | |
time.sleep(self.loop_time - time_diff) | |
now = time.time() | |
self.time_diff_total += now - self.last_time | |
self.last_time = now | |
self.loop_count += 1 | |
def read_from_bus(self): | |
pass | |
def write_to_bus(self): | |
pass | |
def start(self): | |
self.t.start() | |
def set_data_bus(self, data_bus): | |
self.data_bus = data_bus | |
def stop(self): | |
# just check how exact the timing was | |
print('Stopped part', type(self).__name__, 'with avg loop time', | |
self.time_diff_total / self.loop_count) | |
class Odom(Part): | |
""" Mock part to give sin speed in [0,1] and distance in ticks """ | |
def __init__(self): | |
super().__init__(loop_time=0.05) | |
self.tick = 0. | |
def write_to_bus(self): | |
""" Implementation of base class interface """ | |
# mock up speed and distance and write to bus | |
speed = 0.5 + 0.5 * math.sin(self.tick - math.pi/2) | |
self.data_bus.write('speed', float, speed) | |
self.data_bus.write('distance', float, self.tick) | |
self.tick += 0.01 | |
class SpeedGauge(Part): | |
""" Mock part to read speed and distance """ | |
def __init__(self): | |
super().__init__(loop_time=0.2) | |
def read_from_bus(self): | |
speed = self.data_bus.read('speed').data | |
dist = self.data_bus.read('distance').data | |
# Gauge display | |
print(f'Speed: {speed:.4f} distance: {dist:.4f} ') | |
class Car: | |
def __init__(self): | |
self.parts = [] | |
# the ar owns the data bus | |
self.data_bus = DataBus() | |
def add_part(self, part): | |
# here we set the bus into the part, no need for the child classed to | |
# do that | |
part.set_data_bus(self.data_bus) | |
self.parts.append(part) | |
def start(self): | |
for part in self.parts: | |
part.start() | |
try: | |
while True: | |
pass | |
except KeyboardInterrupt: | |
pass | |
except Exception as e: | |
print(e) | |
finally: | |
print(f'Stopped car.') | |
self.stop() | |
def stop(self): | |
for part in self.parts: | |
part.stop() | |
if __name__ == '__main__': | |
# create the car and two parts | |
car = Car() | |
o = Odom() | |
s = SpeedGauge() | |
# We have to add data writing parts before reading parts. This could be | |
# changed but then reading parts would need some fall back to deal with | |
# missing data in initial loop(s). | |
car.add_part(o) | |
car.add_part(s) | |
car.start() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
neat!