Last active
May 23, 2021 09:49
-
-
Save tawnkramer/bce35b848692501a77c94092d019f97e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Usage: | |
simple_cv_racer.py --name=your_name | |
Options: | |
-h --help Show this screen. | |
""" | |
import os | |
import time | |
from docopt import docopt | |
import numpy as np | |
import cv2 | |
from simple_pid import PID | |
import donkeycar as dk | |
from donkeycar.parts.datastore import TubHandler | |
class LineFollower: | |
''' | |
OpenCV based controller | |
This controller takes a horizontal slice of the image at a set Y coordinate. | |
Then it converts to HSV and does a color thresh hold to find the yellow pixels. | |
It does a histogram to find the pixel of maximum yellow. Then is uses that iPxel | |
to guid a PID controller which seeks to maintain the max yellow at the same point | |
in the image. | |
''' | |
def __init__(self): | |
self.vert_scan_y = 60 # num pixels from the top to start horiz scan | |
self.vert_scan_height = 10 # num pixels high to grab from horiz scan | |
self.color_thr_low = np.asarray((0, 50, 50)) # hsv dark yellow | |
self.color_thr_hi = np.asarray((50, 255, 255)) # hsv light yellow | |
self.target_pixel = None # of the N slots above, which is the ideal relationship target | |
self.steering = 0.0 # from -1 to 1 | |
self.throttle = 0.15 # from -1 to 1 | |
self.recording = False # Set to true if desired to save camera frames | |
self.delta_th = 0.1 # how much to change throttle when off | |
self.throttle_max = 0.3 | |
self.throttle_min = 0.15 | |
self.pid_st = PID(Kp=-0.01, Ki=0.00, Kd=-0.001) | |
def get_i_color(self, cam_img): | |
''' | |
get the horizontal index of the color at the given slice of the image | |
input: cam_image, an RGB numpy array | |
output: index of max color, value of cumulative color at that index, and mask of pixels in range | |
''' | |
# take a horizontal slice of the image | |
iSlice = self.vert_scan_y | |
scan_line = cam_img[iSlice : iSlice + self.vert_scan_height, :, :] | |
# convert to HSV color space | |
img_hsv = cv2.cvtColor(scan_line, cv2.COLOR_RGB2HSV) | |
# make a mask of the colors in our range we are looking for | |
mask = cv2.inRange(img_hsv, self.color_thr_low, self.color_thr_hi) | |
# which index of the range has the highest amount of yellow? | |
hist = np.sum(mask, axis=0) | |
max_yellow = np.argmax(hist) | |
return max_yellow, hist[max_yellow], mask | |
def run(self, cam_img): | |
''' | |
main runloop of the CV controller | |
input: cam_image, an RGB numpy array | |
output: steering, throttle, and recording flag | |
''' | |
max_yellow, confidense, mask = self.get_i_color(cam_img) | |
conf_thresh = 0.001 | |
if self.target_pixel is None: | |
# Use the first run of get_i_color to set our relationship with the yellow line. | |
# You could optionally init the target_pixel with the desired value. | |
self.target_pixel = max_yellow | |
# this is the target of our steering PID controller | |
self.pid_st.setpoint = self.target_pixel | |
elif confidense > conf_thresh: | |
# invoke the controller with the current yellow line position | |
# get the new steering value as it chases the ideal | |
self.steering = self.pid_st(max_yellow) | |
# slow down linearly when away from ideal, and speed up when close | |
if abs(max_yellow - self.target_pixel) > 10: | |
if self.throttle > self.throttle_min: | |
self.throttle -= self.delta_th | |
else: | |
if self.throttle < self.throttle_max: | |
self.throttle += self.delta_th | |
# show some diagnostics | |
self.debug_display(cam_img, mask, max_yellow, confidense) | |
return self.steering, self.throttle, self.recording | |
def debug_display(self, cam_img, mask, max_yellow, confidense): | |
''' | |
composite mask on top the original image. | |
show some values we are using for control | |
''' | |
mask_exp = np.stack((mask,)*3, axis=-1) | |
iSlice = self.vert_scan_y | |
img = np.copy(cam_img) | |
img[iSlice : iSlice + self.vert_scan_height, :, :] = mask_exp | |
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
display_str = [] | |
display_str.append("STEERING:{:.1f}".format(self.steering)) | |
display_str.append("THROTTLE:{:.2f}".format(self.throttle)) | |
display_str.append("I YELLOW:{:d}".format(max_yellow)) | |
display_str.append("CONF:{:.2f}".format(confidense)) | |
y = 10 | |
x = 10 | |
for s in display_str: | |
cv2.putText(img, s, color=(0,255,255), org=(x,y), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.4) | |
y += 10 | |
cv2.namedWindow('image', cv2.WINDOW_NORMAL) | |
cv2.imshow("image", img) | |
cv2.resizeWindow('image', 300,300) | |
cv2.waitKey(1) | |
def drive(cfg, args): | |
''' | |
Construct a working robotic vehicle from many parts. | |
Each part runs as a job in the Vehicle loop, calling either | |
it's run or run_threaded method depending on the constructor flag `threaded`. | |
All parts are updated one after another at the framerate given in | |
cfg.DRIVE_LOOP_HZ assuming each part finishes processing in a timely manner. | |
Parts may have named outputs and inputs. The framework handles passing named outputs | |
to parts requesting the same named input. | |
''' | |
#Initialize car | |
V = dk.vehicle.Vehicle() | |
#Camera | |
if cfg.DONKEY_GYM: | |
from donkeycar.parts.dgym import DonkeyGymEnv | |
cfg.GYM_CONF['racer_name'] = args['--name'] | |
cfg.GYM_CONF['car_name'] = args['--name'] | |
cam = DonkeyGymEnv(cfg.DONKEY_SIM_PATH, host=cfg.SIM_HOST, env_name=cfg.DONKEY_GYM_ENV_NAME, conf=cfg.GYM_CONF, delay=cfg.SIM_ARTIFICIAL_LATENCY) | |
inputs = ['steering', 'throttle'] | |
else: | |
from donkeycar.parts.camera import PiCamera | |
cam = PiCamera(image_w=cfg.IMAGE_W, image_h=cfg.IMAGE_H, image_d=cfg.IMAGE_DEPTH) | |
inputs = [] | |
V.add(cam, inputs=inputs, outputs=['cam/image_array'], threaded=True) | |
#Controller | |
V.add(LineFollower(), | |
inputs=['cam/image_array'], | |
outputs=['steering', 'throttle', 'recording']) | |
#Drive train setup | |
if not cfg.DONKEY_GYM: | |
from donkeycar.parts.actuator import PCA9685, PWMSteering, PWMThrottle | |
steering_controller = PCA9685(cfg.STEERING_CHANNEL, cfg.PCA9685_I2C_ADDR, busnum=cfg.PCA9685_I2C_BUSNUM) | |
steering = PWMSteering(controller=steering_controller, | |
left_pulse=cfg.STEERING_LEFT_PWM, | |
right_pulse=cfg.STEERING_RIGHT_PWM) | |
throttle_controller = PCA9685(cfg.THROTTLE_CHANNEL, cfg.PCA9685_I2C_ADDR, busnum=cfg.PCA9685_I2C_BUSNUM) | |
throttle = PWMThrottle(controller=throttle_controller, | |
max_pulse=cfg.THROTTLE_FORWARD_PWM, | |
zero_pulse=cfg.THROTTLE_STOPPED_PWM, | |
min_pulse=cfg.THROTTLE_REVERSE_PWM) | |
V.add(steering, inputs=['steering']) | |
V.add(throttle, inputs=['throttle']) | |
#add tub to save data | |
inputs=['cam/image_array', | |
'steering', 'throttle'] | |
types=['image_array', | |
'float', 'float'] | |
th = TubHandler(path=cfg.DATA_PATH) | |
tub = th.new_tub_writer(inputs=inputs, types=types) | |
V.add(tub, inputs=inputs, outputs=["tub/num_records"], run_condition="recording") | |
#run the vehicle | |
V.start(rate_hz=cfg.DRIVE_LOOP_HZ, | |
max_loop_count=cfg.MAX_LOOPS) | |
if __name__ == '__main__': | |
args = docopt(__doc__) | |
cfg = dk.load_config() | |
drive(cfg, args) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment