Created
October 18, 2017 05:25
-
-
Save wroscoe/d7005216ab36cd4fa6ae0a5b428cbc1f to your computer and use it in GitHub Desktop.
Drive function used on donkey car for avc race
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def drive(cfg, model_path=None, use_joystick=False): | |
''' | |
Construct a working robotic vehicle from many parts. | |
Each part runs as a job in the Vehicle loop, calling either | |
it's run or run_threaded method depending on the constructor flag `threaded`. | |
All parts are updated one after another at the framerate given in | |
cfg.DRIVE_LOOP_HZ assuming each part finishes processing in a timely manner. | |
Parts may have named outputs and inputs. The framework handles passing named outputs | |
to parts requesting the same named input. | |
''' | |
#Initialize car | |
V = dk.vehicle.Vehicle() | |
cam = dk.parts.PiCamera(resolution=cfg.CAMERA_RESOLUTION) | |
V.add(cam, outputs=['cam/image_array'], threaded=True) | |
if use_joystick or cfg.USE_JOYSTICK_AS_DEFAULT: | |
#modify max_throttle closer to 1.0 to have more power | |
#modify steering_scale lower than 1.0 to have less responsive steering | |
ctr = dk.parts.JoystickController(max_throttle=cfg.JOYSTICK_MAX_THROTTLE, | |
steering_scale=cfg.JOYSTICK_STEERING_SCALE, | |
auto_record_on_throttle=cfg.AUTO_RECORD_ON_THROTTLE) | |
else: | |
#This web controller will create a web server that is capable | |
#of managing steering, throttle, and modes, and more. | |
ctr = dk.parts.LocalWebController() | |
V.add(ctr, | |
inputs=['cam/image_array'], | |
outputs=['user/angle', 'user/throttle', 'user/mode', 'recording'], | |
threaded=True) | |
#See if we should even run the pilot module. | |
#This is only needed because the part run_contion only accepts boolean | |
def pilot_condition(mode): | |
if mode == 'user': | |
return False | |
else: | |
return True | |
pilot_condition_part = dk.parts.Lambda(pilot_condition) | |
V.add(pilot_condition_part, inputs=['user/mode'], outputs=['run_pilot']) | |
#Run the pilot if the mode is not user. | |
kl = dk.parts.KerasCategorical() | |
#kl = dk.parts.KerasLinear() | |
if model_path: | |
kl.load(model_path) | |
V.add(kl, inputs=['cam/image_array'], | |
outputs=['pilot/angle', 'pilot/throttle'], | |
run_condition='run_pilot') | |
#Choose what inputs should change the car. | |
def drive_mode(mode, | |
user_angle, user_throttle, | |
pilot_angle, pilot_throttle): | |
if mode == 'user': | |
return user_angle, user_throttle | |
elif mode == 'local_angle': | |
return pilot_angle, user_throttle | |
else: | |
return pilot_angle, pilot_throttle | |
drive_mode_part = dk.parts.Lambda(drive_mode) | |
V.add(drive_mode_part, | |
inputs=['user/mode', 'user/angle', 'user/throttle', | |
'pilot/angle', 'pilot/throttle'], | |
outputs=['angle', 'throttle']) | |
def multiply_angle(angle): | |
angle = angle*1.2 | |
return angle | |
multiply_angle_part = dk.parts.Lambda(multiply_angle) | |
V.add(multiply_angle_part, | |
inputs=['angle'], | |
outputs=['angle'], | |
run_condition='run_pilot') | |
class SmoothAngle: | |
""" | |
Wraps a function into a donkey part. | |
""" | |
def __init__(self, factor=2): | |
""" | |
Accepts the function to use. | |
""" | |
self.factor = factor | |
self.last = 0.0 | |
def run(self, current): | |
new = (current*self.factor+self.last)/(self.factor+1) | |
self.last = new | |
return new | |
def shutdown(self): | |
return | |
smooth_angle_part = SmoothAngle(factor=3) | |
V.add(smooth_angle_part, | |
inputs=['angle'], | |
outputs=['angle'], | |
run_condition='run_pilot') | |
def scale_throttle(throttle, angle): | |
throttle = throttle * (1.0 - (abs(angle) * .6)) | |
return throttle | |
scale_throttle_part = dk.parts.Lambda(scale_throttle) | |
V.add(scale_throttle_part, | |
inputs=['throttle', 'angle'], | |
outputs=['throttle'], | |
run_condition='run_pilot') | |
steering_controller = dk.parts.PCA9685(cfg.STEERING_CHANNEL) | |
steering = dk.parts.PWMSteering(controller=steering_controller, | |
left_pulse=cfg.STEERING_LEFT_PWM, | |
right_pulse=cfg.STEERING_RIGHT_PWM) | |
throttle_controller = dk.parts.PCA9685(cfg.THROTTLE_CHANNEL) | |
throttle = dk.parts.PWMThrottle(controller=throttle_controller, | |
max_pulse=cfg.THROTTLE_FORWARD_PWM, | |
zero_pulse=cfg.THROTTLE_STOPPED_PWM, | |
min_pulse=cfg.THROTTLE_REVERSE_PWM) | |
V.add(steering, inputs=['angle']) | |
V.add(throttle, inputs=['throttle']) | |
#add tub to save data | |
inputs=['cam/image_array', | |
'user/angle', 'user/throttle', | |
'user/mode'] | |
types=['image_array', | |
'float', 'float', | |
'str'] | |
th = dk.parts.TubHandler(path=cfg.DATA_PATH) | |
tub = th.new_tub_writer(inputs=inputs, types=types) | |
V.add(tub, inputs=inputs, run_condition='recording') | |
#run the vehicle for 20 seconds | |
V.start(rate_hz=cfg.DRIVE_LOOP_HZ, | |
max_loop_count=cfg.MAX_LOOPS) | |
print("You can now go to <your pi ip address>:8887 to drive your car.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment