Skip to content

Instantly share code, notes, and snippets.

@Davidnet
Created April 7, 2018 00:16
Show Gist options
  • Save Davidnet/96f294b98df7a7a04b1371c52c521585 to your computer and use it in GitHub Desktop.
Save Davidnet/96f294b98df7a7a04b1371c52c521585 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import numpy as np
import cv2, time, re, subprocess, os
from threading import Thread, Event
import json
NUM_LIDARS = 3
LATERAL_CUTOFF = 150
dir_path = os.path.dirname(os.path.realpath(__file__))
# If they are connected in the same way, Chinese HUB
PORTS = {'B': '2.1', 'L': '2.2', 'C': '2.3', 'R': '2.4.1', 'LL': '2.4.3', 'RR': '2.4.4'}
# ANKER hub
# PORTS = {'B': '2.2', 'L': '2.3', 'C': '2.4.', 'R': '2.1.2', 'LL': '2.1.3', 'RR': '2.1.4'}
### Supported Camera Resolutions
# Aspect ratio 4:3
# (240, 320)
# (480, 640)
# (600, 800)
# Aspect ratio 16:9
# (180, 320)
# (360, 640)
# (720, 1280)
# (1080, 1920)
class CameraHandler(Thread):
camera_error_filename = os.path.join(dir_path, "camera_error.jpg")
def __init__(self, camera_number, height, width, camera_id, start_reading = True):
super(CameraHandler,self).__init__()
self.grab_frames = start_reading
self.video_device = "/dev/video" + str(camera_number)
self.height, self.width = height, width
self.camera_id = camera_id
self.run_event = Event()
self.run_event.set()
self.daemon = True
self.video_handler = cv2.VideoCapture(self.video_device)
if self.video_handler.isOpened(): #opens successfully the camera
# Set desired size
self.set_video_size((height, width))
print("Loading {} ({}) first image...".format(self.camera_id, self.video_device))
try:
self.grabbed, self.image = self.video_handler.read()
self.height, self.width = (int(self.video_handler.get(cv2.CAP_PROP_FRAME_HEIGHT)),
int(self.video_handler.get(cv2.CAP_PROP_FRAME_WIDTH)))
except Exception as e:
print("Some error ocurred reading frames from camera {} (device {}) -> {}".format(self.camera_id, self.video_device, e))
self.grabbed = False
if not self.video_handler.isOpened() or not self.grabbed:
if camera_number is not None:
print("Camera {} on {} could not be opened, may have been disconnected!".format(self.camera_id, self.video_device))
self.video_handler = None
self.set_error_image("NO {} CAMERA FOUND!".format(self.camera_id))
def set_error_image(self, error_msg):
if not os.path.isfile(self.camera_error_filename):
self.image = np.zeros((self.height, self.width, 3 ), dtype=np.uint8)
cv2.putText(self.image, error_msg, (self.width//8, self.height//2),
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255,255,255), 4)
else:
self.image = cv2.resize(cv2.imread(self.camera_error_filename), (
self.width, self.height))
self.image = cv2.flip(self.image,-1) #flip because camera real images are read fliped
def set_video_size(self, size):
self.video_handler.set(cv2.CAP_PROP_FRAME_HEIGHT, size[0])
self.video_handler.set(cv2.CAP_PROP_FRAME_WIDTH, size[1])
def set_grab_frames(self, value):
self.grab_frames = value
def run(self):
if self.video_handler is not None:
while self.run_event.is_set():
if self.grab_frames:
try:
self.grabbed, image = self.video_handler.read()
except Exception as e: #possible "selec timeout" error
print("Some error ocurred reading frames from camera {} (device {}) -> {}".format(self.camera_id, self.video_device, e))
self.set_error_image("READ {} IMAGE ERROR!".format(self.camera_id))
break
if not self.grabbed: #hot Unplugged error
print("Some error ocurred reading frames from camera {} (device {})".format(self.camera_id, self.video_device))
self.set_error_image("READ {} IMAGE ERROR!".format(self.camera_id))
raise Exception('HOT Unplugged!')
self.image = image
else:
time.sleep(0.03) # wait as if a frame were read
self.video_handler.release()
########## NEW Check Video Devices ##################
def find_video_devices():
p = re.compile(r".+video(?P<video>\d+)$", re.I)
devices = subprocess.check_output("ls -l /dev", shell=True).decode('utf-8')
avail_cameras = []
for device in devices.split('\n'):
if device:
info = p.match(device)
if info:
dinfo = info.groupdict()
avail_cameras.append(dinfo["video"])
return list(sorted(map(int,avail_cameras)))
# find usb port given a video number device
def find_usb_ports(cameras):
avail_ports = []
p = re.compile(r"\d-(?P<video>[0-9.]+).+", re.I)
for cam in cameras:
try:
# List of physical ports used in /dev/video# (some devices maybe represent same thing)
path = subprocess.check_output("udevadm info --query=path --name=/dev/video" + str(cam), shell=True).decode('utf-8')
except Exception as e:
print("----- ERROR READING VIDEO DEVICE ----- (Error: {})".format(e))
avail_ports.append('None-{}'.format(cam))
continue
print(path)
path = path[0:path.find("video4linux")].split('/')[-2] #get last item where the port is explicit
info = p.match(path) #get actual address
if info:
dinfo = info.groupdict()
avail_ports.append(dinfo["video"])
return avail_ports
# Actually finds the camera numbers that correspond to real cameras (devices may be repeated)
def find_cameras():
cams = find_video_devices()
avail_ports = find_usb_ports(cams)
usb_ports_cameras = dict()
for port, cam in zip(avail_ports, cams):
if not port in usb_ports_cameras:
usb_ports_cameras[port] = cam
return usb_ports_cameras
def number2video(num):
return "/dev/video"+str(num)
def get_video_device(desired_camera, usb_ports_cameras):
port = PORTS[desired_camera]
if not port in usb_ports_cameras:
return None
else:
return usb_ports_cameras[port]
def check_cameras(camera_handlers, camera_tags):
for camera, tag in zip(camera_handlers, camera_tags):
if camera.video_handler is None:
print("WARN: Camera {} is not connected or not beeing recognized".format(tag))
def get_valid_video_handler(camera_handlers):
for camera in camera_handlers:
if camera.video_handler is not None:
return camera.video_handler
return None
##### Main ############
def main():
cameras = ["LL", "C", "RR", "B", "L", "R"]
height, width, num_cameras = 480, 640, len(cameras)
usb_ports_cameras = find_cameras()
print(usb_ports_cameras)
video_numbers = list(map(lambda desired_camera: get_video_device(desired_camera, usb_ports_cameras), cameras))
print(video_numbers)
initiate_cameras = [True]*num_cameras
initiate_cameras[-2:] = [False, False]
camera_handlers = [CameraHandler(video, height, width, camera_id, start_reading = start)
for video, start, camera_id in zip(usb_ports_cameras.values(), initiate_cameras, cameras)]
map(lambda o: o.start(), camera_handlers)
while True:
images = list(map(lambda o: cv2.flip(o.image,-1), camera_handlers))
time.sleep(1/30.0)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment