Created
April 7, 2018 00:16
-
-
Save Davidnet/96f294b98df7a7a04b1371c52c521585 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import numpy as np | |
import cv2, time, re, subprocess, os | |
from threading import Thread, Event | |
import json | |
NUM_LIDARS = 3 | |
LATERAL_CUTOFF = 150 | |
dir_path = os.path.dirname(os.path.realpath(__file__)) | |
# If they are connected in the same way, Chinese HUB | |
PORTS = {'B': '2.1', 'L': '2.2', 'C': '2.3', 'R': '2.4.1', 'LL': '2.4.3', 'RR': '2.4.4'} | |
# ANKER hub | |
# PORTS = {'B': '2.2', 'L': '2.3', 'C': '2.4.', 'R': '2.1.2', 'LL': '2.1.3', 'RR': '2.1.4'} | |
### Supported Camera Resolutions | |
# Aspect ratio 4:3 | |
# (240, 320) | |
# (480, 640) | |
# (600, 800) | |
# Aspect ratio 16:9 | |
# (180, 320) | |
# (360, 640) | |
# (720, 1280) | |
# (1080, 1920) | |
class CameraHandler(Thread): | |
camera_error_filename = os.path.join(dir_path, "camera_error.jpg") | |
def __init__(self, camera_number, height, width, camera_id, start_reading = True): | |
super(CameraHandler,self).__init__() | |
self.grab_frames = start_reading | |
self.video_device = "/dev/video" + str(camera_number) | |
self.height, self.width = height, width | |
self.camera_id = camera_id | |
self.run_event = Event() | |
self.run_event.set() | |
self.daemon = True | |
self.video_handler = cv2.VideoCapture(self.video_device) | |
if self.video_handler.isOpened(): #opens successfully the camera | |
# Set desired size | |
self.set_video_size((height, width)) | |
print("Loading {} ({}) first image...".format(self.camera_id, self.video_device)) | |
try: | |
self.grabbed, self.image = self.video_handler.read() | |
self.height, self.width = (int(self.video_handler.get(cv2.CAP_PROP_FRAME_HEIGHT)), | |
int(self.video_handler.get(cv2.CAP_PROP_FRAME_WIDTH))) | |
except Exception as e: | |
print("Some error ocurred reading frames from camera {} (device {}) -> {}".format(self.camera_id, self.video_device, e)) | |
self.grabbed = False | |
if not self.video_handler.isOpened() or not self.grabbed: | |
if camera_number is not None: | |
print("Camera {} on {} could not be opened, may have been disconnected!".format(self.camera_id, self.video_device)) | |
self.video_handler = None | |
self.set_error_image("NO {} CAMERA FOUND!".format(self.camera_id)) | |
def set_error_image(self, error_msg): | |
if not os.path.isfile(self.camera_error_filename): | |
self.image = np.zeros((self.height, self.width, 3 ), dtype=np.uint8) | |
cv2.putText(self.image, error_msg, (self.width//8, self.height//2), | |
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255,255,255), 4) | |
else: | |
self.image = cv2.resize(cv2.imread(self.camera_error_filename), ( | |
self.width, self.height)) | |
self.image = cv2.flip(self.image,-1) #flip because camera real images are read fliped | |
def set_video_size(self, size): | |
self.video_handler.set(cv2.CAP_PROP_FRAME_HEIGHT, size[0]) | |
self.video_handler.set(cv2.CAP_PROP_FRAME_WIDTH, size[1]) | |
def set_grab_frames(self, value): | |
self.grab_frames = value | |
def run(self): | |
if self.video_handler is not None: | |
while self.run_event.is_set(): | |
if self.grab_frames: | |
try: | |
self.grabbed, image = self.video_handler.read() | |
except Exception as e: #possible "selec timeout" error | |
print("Some error ocurred reading frames from camera {} (device {}) -> {}".format(self.camera_id, self.video_device, e)) | |
self.set_error_image("READ {} IMAGE ERROR!".format(self.camera_id)) | |
break | |
if not self.grabbed: #hot Unplugged error | |
print("Some error ocurred reading frames from camera {} (device {})".format(self.camera_id, self.video_device)) | |
self.set_error_image("READ {} IMAGE ERROR!".format(self.camera_id)) | |
raise Exception('HOT Unplugged!') | |
self.image = image | |
else: | |
time.sleep(0.03) # wait as if a frame were read | |
self.video_handler.release() | |
########## NEW Check Video Devices ################## | |
def find_video_devices(): | |
p = re.compile(r".+video(?P<video>\d+)$", re.I) | |
devices = subprocess.check_output("ls -l /dev", shell=True).decode('utf-8') | |
avail_cameras = [] | |
for device in devices.split('\n'): | |
if device: | |
info = p.match(device) | |
if info: | |
dinfo = info.groupdict() | |
avail_cameras.append(dinfo["video"]) | |
return list(sorted(map(int,avail_cameras))) | |
# find usb port given a video number device | |
def find_usb_ports(cameras): | |
avail_ports = [] | |
p = re.compile(r"\d-(?P<video>[0-9.]+).+", re.I) | |
for cam in cameras: | |
try: | |
# List of physical ports used in /dev/video# (some devices maybe represent same thing) | |
path = subprocess.check_output("udevadm info --query=path --name=/dev/video" + str(cam), shell=True).decode('utf-8') | |
except Exception as e: | |
print("----- ERROR READING VIDEO DEVICE ----- (Error: {})".format(e)) | |
avail_ports.append('None-{}'.format(cam)) | |
continue | |
print(path) | |
path = path[0:path.find("video4linux")].split('/')[-2] #get last item where the port is explicit | |
info = p.match(path) #get actual address | |
if info: | |
dinfo = info.groupdict() | |
avail_ports.append(dinfo["video"]) | |
return avail_ports | |
# Actually finds the camera numbers that correspond to real cameras (devices may be repeated) | |
def find_cameras(): | |
cams = find_video_devices() | |
avail_ports = find_usb_ports(cams) | |
usb_ports_cameras = dict() | |
for port, cam in zip(avail_ports, cams): | |
if not port in usb_ports_cameras: | |
usb_ports_cameras[port] = cam | |
return usb_ports_cameras | |
def number2video(num): | |
return "/dev/video"+str(num) | |
def get_video_device(desired_camera, usb_ports_cameras): | |
port = PORTS[desired_camera] | |
if not port in usb_ports_cameras: | |
return None | |
else: | |
return usb_ports_cameras[port] | |
def check_cameras(camera_handlers, camera_tags): | |
for camera, tag in zip(camera_handlers, camera_tags): | |
if camera.video_handler is None: | |
print("WARN: Camera {} is not connected or not beeing recognized".format(tag)) | |
def get_valid_video_handler(camera_handlers): | |
for camera in camera_handlers: | |
if camera.video_handler is not None: | |
return camera.video_handler | |
return None | |
##### Main ############ | |
def main(): | |
cameras = ["LL", "C", "RR", "B", "L", "R"] | |
height, width, num_cameras = 480, 640, len(cameras) | |
usb_ports_cameras = find_cameras() | |
print(usb_ports_cameras) | |
video_numbers = list(map(lambda desired_camera: get_video_device(desired_camera, usb_ports_cameras), cameras)) | |
print(video_numbers) | |
initiate_cameras = [True]*num_cameras | |
initiate_cameras[-2:] = [False, False] | |
camera_handlers = [CameraHandler(video, height, width, camera_id, start_reading = start) | |
for video, start, camera_id in zip(usb_ports_cameras.values(), initiate_cameras, cameras)] | |
map(lambda o: o.start(), camera_handlers) | |
while True: | |
images = list(map(lambda o: cv2.flip(o.image,-1), camera_handlers)) | |
time.sleep(1/30.0) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment