Skip to content

Instantly share code, notes, and snippets.

@sergeant-wizard
Created July 11, 2017 09:02
Show Gist options
  • Save sergeant-wizard/235a981f7b552dc58a4757ad370d69c0 to your computer and use it in GitHub Desktop.
Save sergeant-wizard/235a981f7b552dc58a4757ad370d69c0 to your computer and use it in GitHub Desktop.
parallel communication between multiple servos
from time import sleep, time
from multiprocessing import Process, Manager
def hardware_io(action, shared_dict):
write_duration = 1.2
read_duration = 0.4
sleep(write_duration) # mock write (put action)
sleep(read_duration) # mock read (get state)
shared_dict['state'] = time()
class ServoCommunicator(object):
def __init__(self):
self.shared_dict = Manager().dict()
self.shared_dict['state'] = time()
self.io_process = None
def update(self, action):
# wait for the IOs from the last frame to finish
if self.io_process is not None:
self.io_process.join()
self.io_process = Process(
target=hardware_io,
args=(action, self.shared_dict)
)
self.io_process.start()
return self.get_state()
def get_state(self):
return self.shared_dict['state']
class Environment(object):
num_servos = 4
def __init__(self):
self.servos = [
ServoCommunicator() for servo_id in range(self.num_servos)
]
def update(self, actions):
return [
servo.update(action)
for servo, action in zip(self.servos, actions)
]
if __name__ == '__main__':
env = Environment()
while True:
state = env.update([0.0, 1.0, 2.0, 3.0])
sleep(1.0)
print(state)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment