Last active
March 10, 2020 15:54
-
-
Save claudiofahey/46303ab950f1246c5ab63c64b988dbee to your computer and use it in GitHub Desktop.
Sample Python app for ingesting a video camera feed into Pravega
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This script reads video from a camera and writes the frames to Pravega. | |
It also detects motion and can send alerts (via an HTTP call) | |
when motion is above a threshold. | |
It uses the Pravega GRPC Gateway (https://github.com/pravega/pravega-grpc-gateway). | |
""" | |
import cv2 | |
import logging | |
import time | |
import grpc | |
import json | |
import base64 | |
import argparse | |
import multiprocessing | |
import queue | |
import traceback | |
import numpy as np | |
import requests | |
import pravega.grpc_gateway as pravega | |
def video_capture_generator(vidcap, args): | |
while True: | |
pos_frames = vidcap.get(cv2.CAP_PROP_POS_FRAMES) | |
success, image = vidcap.read() | |
if success: | |
video_frame = dict( | |
image=image, | |
frameNumber=int(pos_frames), | |
timestamp=int(time.time() * 1000), | |
) | |
yield video_frame | |
def crop_image(img, top=0, bottom=100, left=90, right=235): | |
cropped = img[top:bottom,left:right,:] | |
return cropped | |
def image_pixel_distance(img1, img2): | |
"""outputs pythagorean distance between two frames""" | |
img1_32 = np.float32(img1) / 255.0 | |
img2_32 = np.float32(img2) / 255.0 | |
diff = img1_32 - img2_32 | |
dist = np.sqrt(diff[:,:,0]**2 + diff[:,:,1]**2 + diff[:,:,2]**2) / np.sqrt(3.0) | |
return dist | |
def image_distance(img1, img2, blur=15, pixel_threshold=0.06, max_valid_distance=0.12): | |
img1 = crop_image(img1) | |
img2 = crop_image(img2) | |
pixel_dist = image_pixel_distance(img1, img2) | |
blurred_dist = cv2.GaussianBlur(pixel_dist, (blur,blur), 0) | |
_, thresh = cv2.threshold(blurred_dist, pixel_threshold, 1.0, 0) | |
distance = thresh.mean() | |
# If too many pixels changed, it is likely that the lighting changed. | |
# This should not be detected as motion. | |
if distance > max_valid_distance: | |
distance = 0.0 - distance | |
return distance | |
def goaway(args): | |
logging.error('#################### MOTION DETECTED ####################') | |
response = requests.post(args.goaway_url) | |
logging.info('goaway: response=%s' % str(response)) | |
def inference_generator(video_frames, args): | |
prev_video_frame = None | |
for video_frame in video_frames: | |
if prev_video_frame is None: | |
video_frame['distance'] = 0.0 | |
video_frame['has_motion'] = False | |
video_frame['alert'] = False | |
else: | |
distance = float(image_distance(prev_video_frame['image'], video_frame['image'])) | |
has_motion = distance > args.motion_threshold | |
alert = (has_motion or distance < 0.0) | |
logging.info('run_inference_process: alert=%d, has_motion=%d, distance=%f', | |
alert, has_motion, distance) | |
video_frame['distance'] = distance | |
video_frame['has_motion'] = has_motion | |
video_frame['alert'] = alert | |
if has_motion: | |
goaway(args) | |
prev_video_frame = video_frame | |
yield video_frame | |
def events_to_write_generator(video_frame_iter, scope, stream, args): | |
for video_frame in video_frame_iter: | |
event_dict = video_frame.copy() | |
event_dict['camera'] = args.camera | |
event_dict['ssrc'] = 0 | |
success, png_array = cv2.imencode('.png', video_frame['image']) | |
event_dict['data'] = base64.b64encode(png_array.tobytes()).decode(encoding='UTF-8') | |
del event_dict['image'] | |
to_log_dict = event_dict.copy() | |
to_log_dict['data'] = '(%d bytes)' % len(event_dict['data']) | |
logging.info('events_to_write_generator: ' + json.dumps(to_log_dict)) | |
event_json = json.dumps(event_dict) | |
event_bytes = event_json.encode(encoding='UTF-8') | |
event_to_write = pravega.pb.WriteEventsRequest( | |
scope=scope, | |
stream=stream, | |
event=event_bytes, | |
routing_key=str(args.camera), | |
) | |
yield event_to_write | |
def run_write_to_pravega_process(queue, scope, stream, args): | |
"""Read events from a queue and write them to Pravega.""" | |
logging.info('run_write_to_pravega_process: BEGIN') | |
logging.info('run_write_to_pravega_process: args=%s' % str(args)) | |
while True: | |
try: | |
with grpc.insecure_channel(args.gateway) as pravega_channel: | |
pravega_client = pravega.grpc.PravegaGatewayStub(pravega_channel) | |
if args.create_scope: | |
request = pravega.pb.CreateScopeRequest(scope=args.scope) | |
logging.info('run_write_to_pravega_process: CreateScope request=%s' % request) | |
response = pravega_client.CreateScope(request) | |
logging.info('run_write_to_pravega_process: CreateScope response=%s' % response) | |
if args.create_stream: | |
request = pravega.pb.CreateStreamRequest( | |
scope=scope, | |
stream=stream, | |
scaling_policy=pravega.pb.ScalingPolicy(min_num_segments=1), | |
) | |
logging.info('run_write_to_pravega_process: CreateStream request=%s' % request) | |
response = pravega_client.CreateStream(request) | |
logging.info('run_write_to_pravega_process: CreateStream response=%s' % response) | |
queue_iter = iter(queue.get, None) | |
write_response = pravega_client.WriteEvents( | |
events_to_write_generator(queue_iter, scope, stream, args)) | |
logging.info('run_write_to_pravega_process: WriteEvents response=%s' % write_response) | |
except Exception as e: | |
logging.error('run_write_to_pravega_process: ' + traceback.format_exc()) | |
time.sleep(5) | |
def main(): | |
logging.basicConfig(level=logging.DEBUG) | |
logging.info('BEGIN') | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--camera', type=int, default=0) | |
parser.add_argument('--no_create_scope', dest='create_scope', action='store_false') | |
parser.add_argument('--no_create_stream', dest='create_stream', action='store_false') | |
parser.add_argument('--gateway', default='docker1:54672') | |
parser.add_argument('--goaway_url', default='http://jetson:5001/cataway/goaway') | |
parser.add_argument('--scope', default='examples') | |
parser.add_argument('--stream', default='jetsoncamera1') | |
parser.add_argument('--alert_stream', default='alert1') | |
parser.add_argument('--motion_threshold', type=float, default=0.0) | |
args = parser.parse_args() | |
logging.info('args=%s' % str(args)) | |
# We use multiprocessing queues to allow inference to occur even if the writes to Pravega fails. | |
write_to_pravega_queue = None | |
if args.stream: | |
write_to_pravega_queue = multiprocessing.Queue(1) | |
write_to_pravega_process = multiprocessing.Process( | |
target=run_write_to_pravega_process, | |
args=(write_to_pravega_queue, args.scope, args.stream, args)) | |
write_to_pravega_process.start() | |
write_alerts_to_pravega_queue = None | |
if args.alert_stream: | |
write_alerts_to_pravega_queue = multiprocessing.Queue(5) | |
write_alerts_to_pravega_process = multiprocessing.Process( | |
target=run_write_to_pravega_process, | |
args=(write_alerts_to_pravega_queue, args.scope, args.alert_stream, args)) | |
write_alerts_to_pravega_process.start() | |
gscmd = """ | |
nvarguscamerasrc | |
! video/x-raw(memory:NVMM), width=1920, height=1080, framerate=2/1, format=NV12 | |
! nvvidconv flip-method=0 | |
! video/x-raw, width=320, height=200, format=BGRx | |
! videoconvert | |
! video/x-raw, format=BGR | |
! appsink | |
""" | |
vidcap = cv2.VideoCapture(gscmd, cv2.CAP_GSTREAMER) | |
logging.info(vidcap) | |
video_frames = video_capture_generator(vidcap, args) | |
video_frames_with_inference = inference_generator(video_frames, args) | |
for video_frame in video_frames_with_inference: | |
try: | |
if write_alerts_to_pravega_queue and video_frame['alert']: | |
write_alerts_to_pravega_queue.put(video_frame, False) | |
if write_to_pravega_queue: | |
write_to_pravega_queue.put(video_frame, False) | |
except queue.Full: | |
logging.warning('Pravega queue is full') | |
logging.info('END') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment