Last active
April 26, 2024 22:32
-
-
Save alfonsrv/a788f8781fb1616e81a6b9cebf1ea2fa to your computer and use it in GitHub Desktop.
Motion detection with OpenCV + grab static snapshot every 1 second from RTSP video stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
# Copyright (c) Rau Systemberatung GmbH (rausys.de) | |
# MIT License | |
# credits: https://pyimagesearch.com/start-here/ | |
import argparse | |
import os | |
from datetime import datetime, timedelta | |
from dataclasses import dataclass, field | |
import imutils | |
import time | |
import cv2 | |
from settings import STREAM_URL, CAMERA_WIDTH, MIN_AREA, MAX_AREA, DEBUG, \ | |
REFERENCE_RELEVANT, RELEVANT_DEBOUNCE, OUTPUT_BACKLOG, \ | |
OUTPUT_INTERVAL, OUTPUT_PATH, OUTPUT_STATIC | |
argparser = argparse.ArgumentParser() | |
argparser.add_argument( | |
'-s', '--stream', | |
help='Stream URL (RTSP) to get video feed from', | |
nargs='?', type=str, | |
default=STREAM_URL | |
) | |
argparser.add_argument( | |
'-w', '--window', | |
help='Show relevant feeds as X11 window', | |
action='store_true') | |
argparser.add_argument( | |
'--debug', | |
help='Output more information', | |
action='store_true' | |
) | |
args = argparser.parse_args() | |
DEBUG = args.debug or DEBUG | |
# Helper variables: | |
# last static saving tracking | |
last_static = datetime.now() | |
# allow for more consistent "continuous relevant event" handling | |
debounce_counter = 0 | |
@dataclass | |
class ReferenceFrames: | |
""" Helper class to manage frames """ | |
frame: object = None | |
timestamp: datetime = field(init=False) | |
previous: list = field(default_factory=lambda: []) | |
latest_capture: object = None | |
def set_frame(self, frame: object): | |
""" Sets reference frame which is used to calculate difference | |
from the current camera image """ | |
if self.frame is None or self.timestamp <= datetime.now() - timedelta(minutes=REFERENCE_RELEVANT): | |
self._set_frame(frame=frame) | |
def _set_frame(self, frame: object): | |
if DEBUG: print('Updating reference frame') | |
self.frame = frame | |
self.timestamp = datetime.now() | |
def append(self, frame: object, contour: int, contour_index: int, contour_amount: int): | |
# Improvement idea: Constant rolling buffer - as soon as occupied=True | |
self.previous.append([frame, contour]) | |
if DEBUG: | |
print(f'[{contour_index+1}/{contour_amount}] {contour}') | |
self.save_image(frame=frame, contour=contour) | |
def unbuffer_previous(self): | |
""" Clean the previous images from buffer and get the most relevant | |
photo based on the biggest movement-amount """ | |
if not self.previous: return | |
self.previous = [f for f in self.previous if f[1] < MAX_AREA] | |
if len(self.previous) < 4: | |
if DEBUG: print('Too few pictures to be considered motion event; discarding') | |
self.previous = list() | |
return | |
# get middle thirds of list; to get most relevant picture | |
image_amount = len(self.previous) // 4 | |
if image_amount > 4: | |
self.previous = self.previous[image_amount:len(self.previous) - image_amount] | |
frame, contour = max(self.previous, key=lambda x: x[1]) | |
self.latest_capture = frame | |
self.previous = list() | |
if OUTPUT_BACKLOG: | |
file_name = f'{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.jpg' | |
print(f'Saving image to backlog: {file_name}') | |
backlog_path = os.path.join(OUTPUT_PATH, 'backlog') | |
os.makedirs(backlog_path, exist_ok=True) | |
cv2.imwrite(os.path.join(backlog_path, file_name), frame) | |
if DEBUG: self.save_image(frame=frame, contour=contour, file_prefix='candidate') | |
@staticmethod | |
def save_image(frame, contour, file_prefix = ''): | |
if file_prefix: f'{file_prefix}-' | |
timestamp = str(datetime.now().strftime('%Y%m%d%H%M%S')) | |
file_name = f'{file_prefix}{timestamp}.jpg' | |
print(f'{timestamp} Candidate Contour: {contour}') | |
print(f'--> saving file to {file_name}') | |
cv2.putText(frame, "Contours: {}".format(contour), (10, 40), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1) | |
cv2.imwrite(file_name, frame) | |
def get_stream(): | |
if not args.stream: | |
print('Stream URI for RTSP server not specified! Exiting') | |
exit(1) | |
return cv2.VideoCapture(args.stream) | |
if __name__ == '__main__': | |
print('Initializing stream...') | |
frames = ReferenceFrames() | |
vs = get_stream() | |
while True: | |
# grab the current frame and initialize the occupied/unoccupied | |
retrieved, full_frame = vs.read() | |
if not retrieved: | |
print('Error retrieving image from stream; reinitializing') | |
vs = get_stream() | |
continue | |
if full_frame is None: continue | |
occupied = False | |
# resize the frame, convert it to grayscale, and blur it | |
scaled_frame = imutils.resize(full_frame, width=CAMERA_WIDTH) | |
y, x, channels = scaled_frame.shape | |
#frame = full_frame[:, START_CROP_X:x] | |
frame = scaled_frame.copy() | |
# src_cropped = src[top_margin:src.shape[0], :] | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
gray = cv2.GaussianBlur(gray, (21, 21), 0) | |
# if the first frame is None, initialize it | |
if frames.frame is None: | |
frames.set_frame(frame=gray) | |
continue | |
# compute the absolute difference between the current frame and first frame | |
frameDelta = cv2.absdiff(frames.frame, gray) | |
thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1] | |
# dilate the thresholded image to fill in holes, then find contours | |
# on thresholded image | |
thresh = cv2.dilate(thresh, None, iterations=2) | |
contours = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
contours = imutils.grab_contours(contours) | |
# if the contour is too small, ignore it | |
relevant_contours = [c for c in contours if cv2.contourArea(c) > MIN_AREA] | |
contour_sizes = [cv2.contourArea(c) for c in relevant_contours] | |
for i, (contour, contour_size) in enumerate(zip(relevant_contours, contour_sizes)): | |
# reset reference picture; this is to help detect if there's actual motion | |
# if multiple consecutive pictures change, it's likely we are dealing with motion | |
frames._set_frame(frame=gray) | |
# compute the bounding box for the contour, draw it on the frame, | |
# and update the status | |
(x, y, w, h) = cv2.boundingRect(contour) | |
# x = x + START_CROP_X # ensure relative boxes are rendered properly | |
cv2.rectangle(scaled_frame, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
debounce_counter = RELEVANT_DEBOUNCE | |
occupied = True | |
frames.append( | |
frame=scaled_frame, | |
contour=contour_size, | |
contour_index=i, | |
contour_amount=len(relevant_contours) | |
) | |
if not occupied: | |
if debounce_counter > 0: debounce_counter -= 1 | |
if not debounce_counter: | |
frames.set_frame(frame=gray) | |
frames.unbuffer_previous() | |
# save image to output static image every two seconds | |
if OUTPUT_STATIC and last_static < datetime.now() - timedelta(seconds=OUTPUT_INTERVAL): | |
last_static = datetime.now() | |
cv2.imwrite(os.path.join(OUTPUT_PATH, 'latest.jpg'), scaled_frame) | |
cv2.imwrite(os.path.join(OUTPUT_PATH, 'latest_full.jpg'), full_frame) | |
smol_frame = imutils.resize(full_frame, width=500) | |
cv2.imwrite(os.path.join(OUTPUT_PATH, 'latest_thumb.jpg'), smol_frame) | |
if DEBUG and args.window: | |
cv2.putText(full_frame, "Contours: {}".format(contour_sizes), (10, 40), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1) | |
cv2.imshow("Thresh", thresh) | |
cv2.imshow("Frame Delta", frameDelta) | |
cv2.imshow("Reference frame", frames.frame) | |
# show the frame and record if the user presses a key | |
if args.window: | |
cv2.imshow("Security Feed", full_frame) | |
if frames.latest_capture is not None: | |
cv2.imshow("Captured", frames.latest_capture) | |
key = cv2.waitKey(1) & 0xFF | |
# if the `q` key is pressed, break from the lop | |
if key == ord("q"): | |
break | |
# cleanup the camera and close any open windows | |
vs.release() # vs.stop() | |
cv2.destroyAllWindows() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
imutils | |
opencv-python |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
# Copyright (c) Rau Systemberatung GmbH (rausys.de) | |
# MIT License | |
import os | |
from distutils.util import strtobool | |
# URL to grab and analyze stream from | |
STREAM_URL = os.environ['CAMERA_URL'] | |
# Resize original picture to (if changed MIN_AREA+MAX_AREA & CROP_X should be changed too) | |
CAMERA_WIDTH = int(os.environ.get('CAMERA_WIDTH', 1080)) | |
# Minutes the reference image is considered relevant; | |
# important for accurate results during day-night transition | |
REFERENCE_RELEVANT = int(os.environ.get('CAMERA_REFERENCE_REFRESH', 10)) | |
# How many frames in order have to be irrelevant to consider | |
# a movement event finished | |
RELEVANT_DEBOUNCE = 20 | |
# Movement detection | |
# MIN_AREA: how many pixels have to change in order to be considered movement | |
# MAX_AREA: maximum pixels changing - everything above is considered a change in scenery (e.g. light on during night), | |
# which results in not being considered for being a valid candidate | |
MIN_AREA: int = int(os.environ.get('CAMERA_MIN_THRESHOLD', 12000)) | |
MAX_AREA: int = int(os.environ.get('CAMERA_MAX_THRESHOLD', 115000)) | |
# Periodically save static image of stream | |
OUTPUT_STATIC: bool = strtobool(os.environ.get('CAMERA_OUTPUT', 'True')) | |
OUTPUT_PATH: str = os.environ.get('CAMERA_OUTPUT_PATH', '.') | |
OUTPUT_INTERVAL: int = int(os.environ.get('CAMERA_OUTPUT_INTERVAL', 1)) | |
# Save most relevant static image of captured motion events | |
OUTPUT_BACKLOG: bool = strtobool(os.environ.get('CAMERA_BACKLOG', 'True')) | |
DEBUG: bool = strtobool(os.environ.get('CAMERA_DEBUG', 'False')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you, saved me a heck of headache and time.