Created
August 20, 2023 14:13
-
-
Save Mahyar24/e51367e1411ffdd75b97efdd7c361c2c to your computer and use it in GitHub Desktop.
Detecting motion, and then recording, streaming and sending an SMS.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3.10 | |
""" | |
This module is used for detecting motion, and then recording, streaming and sending an SMS. | |
Install packages beforehand via: `pip install khayyam pytz requests opencv-python` | |
FFmpeg is required for streaming, and it should be in your PATH. | |
Remember to fill in these items: | |
- streaming link: line 66 | |
- SMS sender and receivers numbers: line 69 | |
- SMS API key: line 78 (I'm using SMS.ir API) | |
- SMS API link: line 179 | |
GitHub: https://github.com/Mahyar24 | |
[email protected], Mon 6 May 2019 | |
""" | |
import json | |
import logging | |
import os | |
import subprocess | |
import threading | |
from datetime import datetime, timedelta | |
from time import struct_time | |
import cv2 | |
import requests | |
from khayyam import JalaliDatetime | |
from pytz import timezone | |
def tehran_time(*_, **__): # for logging in jalali time and dates. | |
tehran_dt = JalaliDatetime.now(tz=timezone("Asia/Tehran")) | |
return struct_time( | |
( | |
tehran_dt.year, | |
tehran_dt.month, | |
tehran_dt.day, | |
tehran_dt.hour, | |
tehran_dt.minute, | |
tehran_dt.second, | |
tehran_dt.weekday(), | |
tehran_dt.dayofyear(), | |
1, | |
) | |
) | |
logging.raiseExceptions = False | |
logging.Formatter.converter = tehran_time | |
logger = logging.getLogger(os.path.basename(__file__)) | |
logger.setLevel(logging.INFO) | |
logger_file = logging.FileHandler("SpyCam.log") | |
logger_stream = logging.StreamHandler() | |
handler_formatter = logging.Formatter( | |
"%(asctime)s:%(message)s.", datefmt="%Y-%m-%d %H:%M:%S" | |
) | |
logger_file.setFormatter(handler_formatter) | |
logger_stream.setFormatter(handler_formatter) | |
logger.addHandler(logger_file) | |
logger.addHandler(logger_stream) | |
class SpyCam: | |
# Replace these with your own values: | |
link = "rtmp://[ip]/[stream]/[eye]?pwd=[password]" | |
payload = { | |
# Replace these with your own values: | |
"lineNumber": 0000000, | |
"mobiles": [ | |
"000000", | |
], | |
"sendDateTime": None, | |
} | |
headers = { | |
# Replace this with your own value: | |
"X-API-KEY": "YOUR_API_KEY", | |
"Content-Type": "application/json", | |
} | |
def __init__(self, sensitivity=0.05, timeout=60, path=".", codec="XVID", fps=18): | |
self.sensitivity = sensitivity | |
self.timeout = timedelta(seconds=timeout) | |
self.path = path | |
self.is_recording = False | |
self.last_seen = datetime.now(tz=timezone("Asia/Tehran")) | |
self.frame = None | |
self.fourcc = cv2.VideoWriter_fourcc(*codec) | |
self.now_output = None | |
self.now_output_name = "" | |
self.cam = cv2.VideoCapture(0) | |
self.width = int(self.cam.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
self.height = int(self.cam.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
self.fps = float(min(int(self.cam.get(cv2.CAP_PROP_FPS)), int(fps))) | |
self.resolution = int(self.width * self.height) | |
self.command = [ | |
"ffmpeg", | |
"-y", | |
"-f", | |
"rawvideo", | |
"-vcodec", | |
"rawvideo", | |
"-pix_fmt", | |
"bgr24", | |
"-s", | |
f"{self.width}x{self.height}", | |
"-r", | |
str(self.fps), | |
"-i", | |
"-", | |
"-c:v", | |
"libx264", | |
"-pix_fmt", | |
"yuv420p", | |
"-preset", | |
"ultrafast", | |
"-f", | |
"flv", | |
self.link, | |
] | |
self.ffmpeg = subprocess.Popen( | |
self.command, | |
stdin=subprocess.PIPE, | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
self.threshold = int(10 / self.sensitivity) | |
self.timelapse = int(10 / self.sensitivity) | |
self.subtractor = cv2.createBackgroundSubtractorMOG2( | |
history=self.timelapse, varThreshold=self.threshold, detectShadows=True | |
) | |
def __enter__(self): | |
return self | |
def __exit__(self, *args, **kwargs): | |
self.cam.release() | |
@staticmethod | |
def time(): | |
return datetime.now(tz=timezone("Asia/Tehran")) | |
def output_name(self): | |
if not self.is_recording: | |
self.now_output_name = ( | |
JalaliDatetime(self.last_seen).strftime("%Y_%m_%d_%H_%M_%S") + ".avi" | |
) | |
return self.now_output_name | |
def output(self): | |
if not self.is_recording: | |
self.now_output = cv2.VideoWriter( | |
os.path.join(self.path, self.output_name()), | |
self.fourcc, | |
self.fps, | |
(self.width, self.height), | |
) | |
return self.now_output | |
def seeing(self): | |
_, self.frame = self.cam.read() | |
grayed = cv2.cvtColor(self.frame, cv2.COLOR_BGR2GRAY) | |
mask = self.subtractor.apply(grayed) | |
return mask | |
def check(self): | |
mask = self.seeing() | |
n = cv2.countNonZero(mask) | |
if (motion := (n / self.resolution)) >= (0.0001 / self.sensitivity): | |
if motion != 1.0: # at first moment our motion ratio is exactly 1.0! | |
return True | |
return False | |
def _send_sms(self): | |
try: | |
resp = requests.post( | |
# Replace this with your own value: | |
"https://api.sms.ir/v1/send/bulk", | |
data=json.dumps(self.payload), | |
headers=self.headers, | |
) | |
resp.raise_for_status() | |
except requests.exceptions.RequestException: | |
logger.error(f"SMS failed to send: {resp.text}!r") | |
else: | |
logger.warning(f"SMS sent successfully: {resp.text}!r") | |
def send_sms(self): | |
time = JalaliDatetime(self.last_seen).strftime("%Y,%m,%d - %H:%M:%S") | |
self.payload[ | |
"messageText" | |
] = f"Movement Detected at {time}, Live streaming: {self.link}" | |
threading.Thread(target=self._send_sms).start() | |
def record_upload(self): | |
self.ffmpeg.stdin.write(self.frame.tobytes()) | |
self.output().write(self.frame) | |
def guard(self): | |
logger.warning("Start guarding") | |
while True: | |
motion = self.check() | |
if self.is_recording: | |
if motion: | |
self.record_upload() | |
self.last_seen = self.time() | |
else: | |
if (self.time() - self.last_seen) >= self.timeout: | |
logger.warning("Camera Released") | |
self.output().release() | |
self.is_recording = False | |
else: | |
self.record_upload() | |
else: | |
if motion: | |
logger.warning("Motion Detected") | |
self.send_sms() | |
self.last_seen = self.time() | |
self.record_upload() | |
self.is_recording = True | |
if __name__ == "__main__": | |
with SpyCam(sensitivity=0.05, timeout=30) as cam: | |
try: | |
cam.guard() | |
except KeyboardInterrupt: | |
logger.warning("Receiving SIGINT Signal; Terminating") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment