Skip to content

Instantly share code, notes, and snippets.

@alfonsrv
Last active April 26, 2024 22:32
Show Gist options
  • Save alfonsrv/a788f8781fb1616e81a6b9cebf1ea2fa to your computer and use it in GitHub Desktop.
Save alfonsrv/a788f8781fb1616e81a6b9cebf1ea2fa to your computer and use it in GitHub Desktop.
Motion detection with OpenCV + grab static snapshot every 1 second from RTSP video stream
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (c) Rau Systemberatung GmbH (rausys.de)
# MIT License
# credits: https://pyimagesearch.com/start-here/
import argparse
import os
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import imutils
import time
import cv2
from settings import STREAM_URL, CAMERA_WIDTH, MIN_AREA, MAX_AREA, DEBUG, \
REFERENCE_RELEVANT, RELEVANT_DEBOUNCE, OUTPUT_BACKLOG, \
OUTPUT_INTERVAL, OUTPUT_PATH, OUTPUT_STATIC
argparser = argparse.ArgumentParser()
argparser.add_argument(
'-s', '--stream',
help='Stream URL (RTSP) to get video feed from',
nargs='?', type=str,
default=STREAM_URL
)
argparser.add_argument(
'-w', '--window',
help='Show relevant feeds as X11 window',
action='store_true')
argparser.add_argument(
'--debug',
help='Output more information',
action='store_true'
)
args = argparser.parse_args()
DEBUG = args.debug or DEBUG
# Helper variables:
# last static saving tracking
last_static = datetime.now()
# allow for more consistent "continuous relevant event" handling
debounce_counter = 0
@dataclass
class ReferenceFrames:
""" Helper class to manage frames """
frame: object = None
timestamp: datetime = field(init=False)
previous: list = field(default_factory=lambda: [])
latest_capture: object = None
def set_frame(self, frame: object):
""" Sets reference frame which is used to calculate difference
from the current camera image """
if self.frame is None or self.timestamp <= datetime.now() - timedelta(minutes=REFERENCE_RELEVANT):
self._set_frame(frame=frame)
def _set_frame(self, frame: object):
if DEBUG: print('Updating reference frame')
self.frame = frame
self.timestamp = datetime.now()
def append(self, frame: object, contour: int, contour_index: int, contour_amount: int):
# Improvement idea: Constant rolling buffer - as soon as occupied=True
self.previous.append([frame, contour])
if DEBUG:
print(f'[{contour_index+1}/{contour_amount}] {contour}')
self.save_image(frame=frame, contour=contour)
def unbuffer_previous(self):
""" Clean the previous images from buffer and get the most relevant
photo based on the biggest movement-amount """
if not self.previous: return
self.previous = [f for f in self.previous if f[1] < MAX_AREA]
if len(self.previous) < 4:
if DEBUG: print('Too few pictures to be considered motion event; discarding')
self.previous = list()
return
# get middle thirds of list; to get most relevant picture
image_amount = len(self.previous) // 4
if image_amount > 4:
self.previous = self.previous[image_amount:len(self.previous) - image_amount]
frame, contour = max(self.previous, key=lambda x: x[1])
self.latest_capture = frame
self.previous = list()
if OUTPUT_BACKLOG:
file_name = f'{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.jpg'
print(f'Saving image to backlog: {file_name}')
backlog_path = os.path.join(OUTPUT_PATH, 'backlog')
os.makedirs(backlog_path, exist_ok=True)
cv2.imwrite(os.path.join(backlog_path, file_name), frame)
if DEBUG: self.save_image(frame=frame, contour=contour, file_prefix='candidate')
@staticmethod
def save_image(frame, contour, file_prefix = ''):
if file_prefix: f'{file_prefix}-'
timestamp = str(datetime.now().strftime('%Y%m%d%H%M%S'))
file_name = f'{file_prefix}{timestamp}.jpg'
print(f'{timestamp} Candidate Contour: {contour}')
print(f'--> saving file to {file_name}')
cv2.putText(frame, "Contours: {}".format(contour), (10, 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
cv2.imwrite(file_name, frame)
def get_stream():
if not args.stream:
print('Stream URI for RTSP server not specified! Exiting')
exit(1)
return cv2.VideoCapture(args.stream)
if __name__ == '__main__':
print('Initializing stream...')
frames = ReferenceFrames()
vs = get_stream()
while True:
# grab the current frame and initialize the occupied/unoccupied
retrieved, full_frame = vs.read()
if not retrieved:
print('Error retrieving image from stream; reinitializing')
vs = get_stream()
continue
if full_frame is None: continue
occupied = False
# resize the frame, convert it to grayscale, and blur it
scaled_frame = imutils.resize(full_frame, width=CAMERA_WIDTH)
y, x, channels = scaled_frame.shape
#frame = full_frame[:, START_CROP_X:x]
frame = scaled_frame.copy()
# src_cropped = src[top_margin:src.shape[0], :]
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
# if the first frame is None, initialize it
if frames.frame is None:
frames.set_frame(frame=gray)
continue
# compute the absolute difference between the current frame and first frame
frameDelta = cv2.absdiff(frames.frame, gray)
thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
# dilate the thresholded image to fill in holes, then find contours
# on thresholded image
thresh = cv2.dilate(thresh, None, iterations=2)
contours = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours = imutils.grab_contours(contours)
# if the contour is too small, ignore it
relevant_contours = [c for c in contours if cv2.contourArea(c) > MIN_AREA]
contour_sizes = [cv2.contourArea(c) for c in relevant_contours]
for i, (contour, contour_size) in enumerate(zip(relevant_contours, contour_sizes)):
# reset reference picture; this is to help detect if there's actual motion
# if multiple consecutive pictures change, it's likely we are dealing with motion
frames._set_frame(frame=gray)
# compute the bounding box for the contour, draw it on the frame,
# and update the status
(x, y, w, h) = cv2.boundingRect(contour)
# x = x + START_CROP_X # ensure relative boxes are rendered properly
cv2.rectangle(scaled_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
debounce_counter = RELEVANT_DEBOUNCE
occupied = True
frames.append(
frame=scaled_frame,
contour=contour_size,
contour_index=i,
contour_amount=len(relevant_contours)
)
if not occupied:
if debounce_counter > 0: debounce_counter -= 1
if not debounce_counter:
frames.set_frame(frame=gray)
frames.unbuffer_previous()
# save image to output static image every two seconds
if OUTPUT_STATIC and last_static < datetime.now() - timedelta(seconds=OUTPUT_INTERVAL):
last_static = datetime.now()
cv2.imwrite(os.path.join(OUTPUT_PATH, 'latest.jpg'), scaled_frame)
cv2.imwrite(os.path.join(OUTPUT_PATH, 'latest_full.jpg'), full_frame)
smol_frame = imutils.resize(full_frame, width=500)
cv2.imwrite(os.path.join(OUTPUT_PATH, 'latest_thumb.jpg'), smol_frame)
if DEBUG and args.window:
cv2.putText(full_frame, "Contours: {}".format(contour_sizes), (10, 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
cv2.imshow("Thresh", thresh)
cv2.imshow("Frame Delta", frameDelta)
cv2.imshow("Reference frame", frames.frame)
# show the frame and record if the user presses a key
if args.window:
cv2.imshow("Security Feed", full_frame)
if frames.latest_capture is not None:
cv2.imshow("Captured", frames.latest_capture)
key = cv2.waitKey(1) & 0xFF
# if the `q` key is pressed, break from the lop
if key == ord("q"):
break
# cleanup the camera and close any open windows
vs.release() # vs.stop()
cv2.destroyAllWindows()
imutils
opencv-python
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (c) Rau Systemberatung GmbH (rausys.de)
# MIT License
import os
from distutils.util import strtobool
# URL to grab and analyze stream from
STREAM_URL = os.environ['CAMERA_URL']
# Resize original picture to (if changed MIN_AREA+MAX_AREA & CROP_X should be changed too)
CAMERA_WIDTH = int(os.environ.get('CAMERA_WIDTH', 1080))
# Minutes the reference image is considered relevant;
# important for accurate results during day-night transition
REFERENCE_RELEVANT = int(os.environ.get('CAMERA_REFERENCE_REFRESH', 10))
# How many frames in order have to be irrelevant to consider
# a movement event finished
RELEVANT_DEBOUNCE = 20
# Movement detection
# MIN_AREA: how many pixels have to change in order to be considered movement
# MAX_AREA: maximum pixels changing - everything above is considered a change in scenery (e.g. light on during night),
# which results in not being considered for being a valid candidate
MIN_AREA: int = int(os.environ.get('CAMERA_MIN_THRESHOLD', 12000))
MAX_AREA: int = int(os.environ.get('CAMERA_MAX_THRESHOLD', 115000))
# Periodically save static image of stream
OUTPUT_STATIC: bool = strtobool(os.environ.get('CAMERA_OUTPUT', 'True'))
OUTPUT_PATH: str = os.environ.get('CAMERA_OUTPUT_PATH', '.')
OUTPUT_INTERVAL: int = int(os.environ.get('CAMERA_OUTPUT_INTERVAL', 1))
# Save most relevant static image of captured motion events
OUTPUT_BACKLOG: bool = strtobool(os.environ.get('CAMERA_BACKLOG', 'True'))
DEBUG: bool = strtobool(os.environ.get('CAMERA_DEBUG', 'False'))
@OttomanZ
Copy link

Thank you, saved me a heck of headache and time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment