-
-
Save niteshgaba/2148e78ae5276907fc49c4eddb44446e to your computer and use it in GitHub Desktop.
Arducam
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import cv2 | |
from multiprocessing import Process, Queue | |
from threading import Thread | |
from datetime import datetime | |
from datetime import timedelta | |
import os | |
import time | |
from timeit import default_timer as timer | |
import operator | |
import queue | |
from multiprocessing import Manager | |
def gstreamer_pipeline( | |
capture_width=1920, | |
capture_height=1080, | |
display_width=1920, | |
display_height=1080, | |
framerate=60, | |
flip_method=0, | |
): | |
return ( | |
"nvarguscamerasrc aelock=true ! " | |
"video/x-raw(memory:NVMM), " | |
"width=(int)%d, height=(int)%d, " | |
"format=(string)NV12, framerate=(fraction)%d/1 ! " | |
"nvvidconv flip-method=%d ! " | |
"video/x-raw, width=(int)%d, height=(int)%d, format=(string)BGRx ! " | |
"videoconvert ! " | |
"video/x-raw, format=(string)BGR ! appsink" | |
% ( | |
capture_width, | |
capture_height, | |
framerate, | |
flip_method, | |
display_width, | |
display_height, | |
) | |
) | |
def __process_job(delta_x, delta_y, delta_z, delta_o, speed, iteration, move_output_queue): | |
time.sleep(5) | |
move_output_queue.put(True) | |
def __multiple_shots(camera, move_output_queue, frame_output_queue, all_files): | |
while True: | |
try: | |
command = move_output_queue.get_nowait() | |
except queue.Empty: | |
frame = None | |
dt = datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') | |
_, frame = camera.read() | |
if(frame is not None): | |
m = frame.copy() | |
all_files[dt] = m | |
frame_output_queue.put(dt) | |
else: | |
if(command is not None): | |
frame_output_queue.put(None) | |
break | |
def get_brenner_score(frame): | |
return 1 | |
def generate_score(process_count, frame_output_queue, score_output_queue, all_files): | |
while True: | |
print("ALL FILES IN SCORE", len(all_files)) | |
frame_output = frame_output_queue.get() | |
if frame_output is None: | |
print("frame_output is none") | |
score_output_queue.put(None) | |
print("generate_score worker exited!", process_count) | |
return | |
dt = frame_output | |
if(dt in all_files): | |
frame = all_files[dt] | |
score = get_brenner_score(frame) | |
file_details = { | |
"frame_captured" : dt, | |
"score":score | |
} | |
print("Queue", process_count, score) | |
score_output_queue.put(file_details) | |
else: | |
print("Missing- requeue", dt) | |
frame_output_queue.put(dt) | |
def __process_frames(camera, folder_path, extension, delta_x, delta_y, delta_z, delta_o, speed, iteration, final_output_queue, prefix, top_percentage): | |
move_output_queue = Queue(maxsize=0) | |
frame_output_queue = Queue(maxsize=0) | |
score_output_queue = Queue(maxsize=0) | |
all_files = Manager().dict() | |
processed_file_details = [] | |
# start the movement | |
p = Thread(target=__process_job, args=(delta_x, delta_y, delta_z, delta_o, speed, iteration, move_output_queue)) | |
p.start() | |
# start the capture | |
th = Thread(target=__multiple_shots, args=(camera, move_output_queue, frame_output_queue, all_files)) | |
th.start() | |
s = Thread(target=generate_score, args=(0, frame_output_queue, score_output_queue, all_files, )) | |
s.start() | |
exit_count = 0 | |
while True: | |
segment_output_row = score_output_queue.get() | |
if (segment_output_row is None): | |
exit_count += 1 | |
print("file worker exit count ", exit_count) | |
if exit_count == 1: | |
top_x = int(len(processed_file_details) * top_percentage / 100) | |
keyfun = operator.itemgetter("score") | |
processed_file_details.sort(key=keyfun, reverse=True) | |
files_to_write = processed_file_details[:top_x] | |
print("All work Done, time to store ", top_x, "out of", len(all_files)) | |
file_details = [] | |
for f in files_to_write: | |
key = f["frame_captured"] | |
img_file_name = "{}_{}.{}".format(prefix, key, extension) | |
file_name = os.path.join(folder_path, img_file_name) | |
print("writing (cap, writing)", file_name, key, datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')) | |
frame = all_files[f["frame_captured"]] | |
cv2.imwrite(file_name, frame) | |
file_details.append([file_name, f["score"]]) | |
move_output_queue.close() | |
frame_output_queue.close() | |
score_output_queue.close() | |
all_files.clear() | |
final_output_queue.put(file_details) | |
break | |
else: | |
processed_file_details.append(segment_output_row) | |
p.join() | |
th.join() | |
s.join() | |
def __continous_move_internal(cam, folder_path, delta_x, delta_y, delta_z, delta_o, speed, iteration, prefix, top_percentage): | |
final_output_queue = Queue(maxsize=0) | |
start = time.time() | |
__process_frames(cam, folder_path, "jpg", delta_x, delta_y, delta_z, delta_o, speed, iteration, final_output_queue, prefix, top_percentage) | |
while(final_output_queue.empty()): | |
time.sleep(0.1) | |
end = time.time() | |
print("A----->", (end - start)) | |
all_files = final_output_queue.get() | |
final_output_queue.close() | |
return all_files | |
def main(): | |
cam = cv2.VideoCapture(gstreamer_pipeline(flip_method=0), cv2.CAP_GSTREAMER) | |
if not cam.isOpened(): | |
print("Device not found") | |
return None | |
cwd = os.getcwd() | |
sample_id = input("Start Sample Code : ") | |
folder_path = os.path.join(cwd, 'testing', sample_id) | |
os.makedirs(folder_path) | |
all_files = __continous_move_internal(cam, folder_path, 0, 0, 0, 1, 10000, 1, "PRE1", 5) | |
print(all_files) | |
cam.release() | |
print("Are we oK") | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment