Skip to content

Instantly share code, notes, and snippets.

@donrestarone
Last active February 3, 2025 23:28
Show Gist options
  • Save donrestarone/527293a0a8779dcfbb0311b3add6e78e to your computer and use it in GitHub Desktop.
Save donrestarone/527293a0a8779dcfbb0311b3add6e78e to your computer and use it in GitHub Desktop.
raspberry pi infrared sensor activated LED 'security light' - custom
#!/usr/bin/env python3
########################################################################
# Filename : SenseLED.py
# Description : Control led with infrared Motion sensor.
# auther : www.freenove.com
# modification: @donrestarone
########################################################################
from gpiozero import LED,MotionSensor
import time
import datetime
from datetime import timezone
import subprocess
import psutil
import os
import requests
import json
import pathlib
import dropbox
timestamp = datetime.datetime.now()
ledPin = 18 # define ledPin
sensorPin = 17 # define sensorPin
led = LED(ledPin)
sensor = MotionSensor(sensorPin)
sensor.wait_for_no_motion()
print("[INFO] booting sensor system, please stand by.. %s" % (timestamp))
api_key = os.environ['MONITORING_API_KEY']
api_base_url = os.environ['MONITORING_API_ENDPOINT']
this_client_id = os.environ['MONITORING_CLIENT_ID']
client = None
motion_message = None
video_file_path = None
def loop():
# Variables to hold the current and last states
currentstate = False
previousstate = False
timestamp = datetime.datetime.now()
capture_process_timeout = 60 # in seconds
print("[INFO] booted sensor system, OK %s" % (timestamp))
is_currently_capturing_video = False
unix_timestamp = timestamp.timestamp()
timestamp = datetime.datetime.now()
report_json = {
'evidence': '[INFO] system booted',
'triggered_by': 'system',
'evidence_format': 'string',
'evidence_metadata': {
'timestamp_unix': str(unix_timestamp),
'timestamp_local': str(timestamp)
},
}
# initialize client <> server connection
client = get_client_configuration()
perform_client_health_check(client, timestamp)
ping_dynamic_dns(client)
perform_incident_reporting(client, timestamp, report_json)
try:
# attempt to free up dropbox space once per boot
detect_and_capture_dropbox_freeable_space(client)
except Exception as e:
print('[REMOTE ERROR] could not connect to dropbox due to: %s' %(e))
# clear violet rails space instead
while True:
# make sure to not run out of storage
detect_and_capture_freeable_space()
# Read sensor state
currentstate = sensor.motion_detected
client = get_client_configuration()
perform_client_health_check(client, timestamp)
# If the sensor is triggered
if currentstate == True and previousstate == False:
led.on()
timestamp = datetime.datetime.now()
client_enabled = client['attributes']['properties']['enabled'] if client != False else False
client_armed = client['attributes']['properties']['armed'] if client != False else False
if client_enabled and client_armed:
ensure_video_camera_initialized()
else:
print('[INFO] remote monitoring not enabled')
motion_message = "[ALERT] Motion Detected at %s" % (timestamp)
print(motion_message)
if client_enabled and client_armed:
# hooks
print("[INFO] enabled and armed, running hooks at %s" % (timestamp))
unix_timestamp = timestamp.timestamp()
video_file_path = '/home/donrestarone/motion_captures/motion-capture%03d.mp4' % unix_timestamp
# ensure that there is a mic connected here via `lsusb`
mic_hardware_address = 'hw:2'
try:
if is_currently_capturing_video == False:
# take video
print('[INFO] capturing video file: %s' % video_file_path)
is_currently_capturing_video = True
subprocess.call([
'ffmpeg',
'-ar', '44100',
'-ac', '1',
'-f', 'alsa',
'-i', mic_hardware_address,
'-f', 'v4l2',
'-c:v', 'h264',
'-r', '30',
'-s', '1920x1080',
'-itsoffset', '0.5',
'-i', '/dev/video0',
'-t', '5',
'-copyinkf',
'-codec:v', 'copy',
'-codec:a', 'aac',
'-ab', '128k',
'-g', '10',
# ENSURE THIS PATH EXISTS!
video_file_path,
], timeout=capture_process_timeout)
print('[INFO] captured video file OK: %s' % video_file_path)
is_currently_capturing_video = False
except Exception as e:
is_currently_capturing_video = False
print('[ERROR] saving video file: %s failed due to: %s' %(video_file_path, e))
os.system("sudo reboot")
# post incident
try:
report_json = {
'evidence': motion_message,
'triggered_by': 'infrared-motion',
'evidence_format': 'string',
'evidence_metadata': {
'timestamp_unix': str(unix_timestamp),
'timestamp_local': str(timestamp)
},
}
perform_incident_reporting(client, timestamp, report_json)
try:
upload_to_dropbox(client, timestamp, report_json, video_file_path)
except Exception as e:
print('[REMOTE ERROR] could not upload to dropbox due to: %s' %(e))
# send to violet rails instead
upload_video_file_to_violet_rails(client, timestamp, video_file_path)
except Exception as e:
print('[REMOTE ERROR] could not report due to: %s' %(e))
os.system("sudo reboot")
# Record previous state
previousstate = True
# If the sensor has returned to ready state
elif currentstate == False and previousstate == True:
led.off()
timestamp = datetime.datetime.now()
print("[INFO] No Motion Detected at %s" % (timestamp))
previousstate = False
# Wait for 10 milliseconds
time.sleep(0.01)
def destroy():
led.close()
sensor.close()
def ensure_video_camera_initialized():
# ENSURE THIS IS THE CAMERA MODEL, OTHERWISE THE SYSTEM WILL BOOTLOOP! - INTRODUCED TO REBOOT IN CASE CAMERA HARDWARE CRASHES
camera_model = "Razer Kiyo Pro"
# this fails loudly if Cannot open device /dev/video0, exiting - so not having any camera plugged-in will not cause a bootloop
if camera_model in str(subprocess.check_output(['v4l2-ctl', '--list-devices'])):
print("[INFO] camera OK")
else:
os.system("sudo reboot")
def detect_and_capture_freeable_space():
hdd = psutil.disk_usage('/')
if hdd.percent > 90:
print("[WARN] running out of storage, freeing space now")
subprocess.call([
'rm',
# ENSURE THIS PATH EXISTS!
'/home/donrestarone/motion_captures/*',
])
print("[INFO] storage space cleared in /home/donrestarone/motion_captures/* - OK")
# server connection code
def get_client_configuration():
try:
# TODO fix security vulnerability - this endpoint returns all monitoring clients, change violet rails code to allow for scoping by api_key_only
monitoring_clients = requests.get('%s/api/1/monitoring_clients' % (api_base_url), headers={"Authorization": 'Bearer %s' % (api_key) })
clients_struct = json.loads(monitoring_clients.content)['data']
# filter to correct client, again this allows for each client to access clients they are not supposed to!
this_client = list(filter(lambda c: c['attributes']['properties']['client_uuid'] == this_client_id, clients_struct))[0]
# handle system shut off, while enabled: false the system will bootloop waiting for a wake signal!
if this_client['attributes']['properties']['enabled'] == False:
print("[REMOTE] client disabled, rebooting")
os.system("sudo reboot")
else:
return this_client
except Exception as e:
print('[REMOTE ERROR] could not get client configuration: %s' %(e))
return False
def perform_client_health_check(this_client, timestamp):
if this_client == False:
return False
id = this_client['id']
endpoint = '%s/api/1/monitoring_clients/edit/%s' % (api_base_url, id)
# construct json object with existing keys because violet rails clobbers existing keys on update TODO fix this behavior in the server
attributes_json = this_client['attributes']['properties']
attributes_json['last_health_check'] = str(timestamp)
if this_client['attributes']['properties']['enabled'] == False:
return False
try:
response = requests.patch(endpoint, json={'data': attributes_json}, headers={"Authorization": 'Bearer %s' % (api_key) })
if response.status_code == 200:
print("[REMOTE] performed client health check at %s" % (timestamp))
except Exception as e:
print('[REMOTE ERROR] could not perform client health check: %s' %(e))
def perform_incident_reporting(this_client, timestamp, report_json):
id = this_client['id']
schema_endpoint = '%s/api/1/monitoring_incidents/describe' %(api_base_url)
# allows reporting schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) })
json_schema = json.loads(schema_response.content)['data']['attributes']['properties']
# write incident to latest schema
json_schema['monitoring_client_id'] = id
json_schema['evidence'] = report_json['evidence']
json_schema['triggered_by'] = report_json['triggered_by']
json_schema['evidence_format'] = report_json['evidence_format']
json_schema['evidence_metadata'] = report_json['evidence_metadata']
endpoint = '%s/api/1/monitoring_incidents' %(api_base_url)
incident_response = requests.post(endpoint, json={'data': json_schema}, headers={"Authorization": 'Bearer %s' % (api_key) })
print("[REMOTE] performed incident reporting at %s" % (timestamp))
def upload_to_dropbox(this_client, timestamp, report_json, video_file_path):
dropbox_access_token = this_client['attributes']['properties']['dropbox_api_key']
app_key = this_client['attributes']['properties']['dropbox_app_key']
app_secret = this_client['attributes']['properties']['dropbox_app_secret']
filepath = pathlib.Path(video_file_path)
file_name = video_file_path.split('/')[-1]
targetfile = '/' + app_key + '/' + file_name
d = dropbox.Dropbox(oauth2_access_token=dropbox_access_token, app_key=app_key, app_secret=app_secret)
with filepath.open("rb") as f:
print('[INFO] uploading video: %s to dropbox' %(file_name))
meta = d.files_upload(f.read(), targetfile, mode=dropbox.files.WriteMode("overwrite"))
print('[INFO] completed uploading video: %s to dropbox' %(file_name))
def detect_and_capture_dropbox_freeable_space(this_client):
dropbox_access_token = this_client['attributes']['properties']['dropbox_api_key']
app_key = this_client['attributes']['properties']['dropbox_app_key']
app_secret = this_client['attributes']['properties']['dropbox_app_secret']
dbx = dropbox.Dropbox(oauth2_access_token=dropbox_access_token, app_key=app_key, app_secret=app_secret)
allocation = dbx.users_get_space_usage().allocation
currently_allocated_bytes = 0
minimum_bytes = 32000000 # 32MB
currently_used_storage_bytes = dbx.users_get_space_usage().used
if allocation.is_individual():
currently_allocated_bytes = allocation.get_individual().allocated
elif allocation.is_team():
currently_allocated_bytes = allocation.get_team().allocated
if (currently_allocated_bytes - currently_used_storage_bytes) < minimum_bytes:
print('[REMOTE] dropbox storage minimums reached, purging old files')
wildcard_path = '/' + app_key
dbx.files_delete_v2(wildcard_path)
def ping_dynamic_dns(this_client):
id = this_client['id']
uuid = this_client['attributes']['properties']['client_uuid']
webhook_authorization_key = uuid + '==' + api_key
endpoint = '%s/api/1/monitoring_client_dynamic_dns_lookups/dynamic_dns_lookup/webhook' % (api_base_url)
ips = subprocess.check_output(['hostname', '--all-ip-addresses'])
schema_endpoint = '%s/api/1/monitoring_client_dynamic_dns_lookups/describe' %(api_base_url)
# allows ddns schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) })
json_schema = json.loads(schema_response.content)['data']['attributes']['properties']
# write incident to latest schema
json_schema['monitoring_client_id'] = id
json_schema['ip'] = str(ips)
print(json_schema, 'dns ping')
print("[INFO] performing DDNS ping")
try:
response = requests.post(endpoint, json=json_schema, headers={"Authorization": webhook_authorization_key })
if response.status_code == 200:
print("[REMOTE] performed DDNS ping")
except Exception as e:
print('[REMOTE ERROR] could not perform DDNS ping: %s' %(e))
def upload_video_file_to_violet_rails(this_client, timestamp, video_file_path):
id = this_client['id']
uuid = this_client['attributes']['properties']['client_uuid']
webhook_authorization_key = uuid + '==' + api_key
endpoint = '%s/api/1/monitoring_incidents/video_upload/webhook' % (api_base_url)
schema_endpoint = '%s/api/1/monitoring_incidents/describe' %(api_base_url)
# allows ddns schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) })
json_schema = json.loads(schema_response.content)['data']['attributes']['properties']
# write incident to latest schema
json_schema['monitoring_client_id'] = id
json_schema['evidence'] = 'see video file'
json_schema['triggered_by'] = 'infrared-motion'
json_schema['evidence_format'] = 'file'
json_schema['local_file_name'] = video_file_path
json_schema['local_time_of_recording'] = str(timestamp)
print("[INFO] uploading evidence file to violet rails")
try:
filepath = pathlib.Path(video_file_path)
files = {'file': open(video_file_path,'rb')}
file_name = video_file_path.split('/')[-1]
response = requests.post(endpoint, files=files, data=json_schema, headers={"Authorization": webhook_authorization_key })
print(response.content, 'VR upload')
if response.status_code == 200:
print("[INFO] uploaded evidence file to violet rails")
except Exception as e:
print('[REMOTE ERROR] could not upload evidence file to violet rails: %s' %(e))
if __name__ == '__main__': # Program entrance
print ('[INFO] Program is starting...')
try:
loop()
except KeyboardInterrupt: # Press ctrl-c to end the program.
destroy()
print("[INFO] Ending program")
@donrestarone
Copy link
Author

donrestarone commented Jan 19, 2025

readme

original script: https://gist.github.com/donrestarone/e2211040b94e0072e078d5d8ad201c65

first start

dropbox setup!

Screen Shot 2025-01-26 at 12 31 14 AM

to ensure dropbox file upload works, create an app and use the App key, App secret, and Generated access token (oauth2 API key) that is given there.

  • ensure dropbox python package is installed: sudo apt install python3-dropbox
  • ensure dropbox metadata is set on Violet Rails API namespace: monitoring_clients

want a display?

install the Freenove LCD and follow setup guide here: https://gist.github.com/donrestarone/812dcd467b0df8af6a13f398156c3a3b

server connection:

ensure that API endpoint, client API key and client ID are in the raspberry pi environment

nano .bashrc

export MONITORING_API_KEY="api-key-here"
export MONITORING_CLIENT_ID="client-id-here"
export MONITORING_API_ENDPOINT="https://monitoring.restarone.solutions"
  • in the raspberry pi: ensure motion_captures folder exists at the ~ level. eg: /home/donrestarone/motion_captures
  • ensure tmux is installed - sudo apt-get install tmux
  • at the ~ level create bash script named monitoring.sh (dont forget to chmod +x monitoring.sh to make it executable) that starts tmux and kicks off the python monitoring service (monitoring.py)
  • at the ~ level copy paste the python source above to the python script monitoring.py
  • ensure the service file monitoring.service exists at /etc/systemd/system/monitoring.service
  • enable the monitoring.service by running: sudo systemctl start monitoring.service && sudo systemctl enable monitoring.service

monitoring.sh

#!/bin/bash
sessname="security"


# Create a new session named "$sessname", and run command
tmux new-session -d -s "$sessname"
tmux send-keys -t "$sessname" "python3 monitoring.py" Enter

# Attach to session named "$sessname"
#tmux attach -t "$sessname"

monitoring.py

see here: https://gist.github.com/donrestarone/527293a0a8779dcfbb0311b3add6e78e#file-infraredsenseled-py

monitoring.service

[Unit]
Description=Monitoring service
Wants=network-online.target
After=network-online.target

[Service]
WorkingDirectory=/home/donrestarone
ExecStart=/bin/bash /home/donrestarone/monitoring.sh
Type=forking
User=donrestarone
[Install]
WantedBy=multi-user.target
  • dont forget to start and enable it! sudo systemctl start monitoring.service && sudo systemctl enable monitoring.service

copying captured images over to your local computer

scp [email protected]:/home/donrestarone/motion_captures/motion-capture1737318141.jpg ./

tuning:

IMG_7198

@donrestarone
Copy link
Author

improve dropbox connection: https://developers.dropbox.com/oauth-guide

[REMOTE] performed incident reporting at 2025-01-26 09:21:21.410273
Unable to refresh access token without                 refresh token and app key
Traceback (most recent call last):
  File "/home/donrestarone/monitoring.py", line 283, in <module>
    loop()
  File "/home/donrestarone/monitoring.py", line 64, in loop
    detect_and_capture_dropbox_freeable_space(client)
  File "/home/donrestarone/monitoring.py", line 244, in detect_and_capture_dropbox_freeable_space
    allocation = dbx.users_get_space_usage().allocation
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/dropbox/base.py", line 5855, in users_get_space_usage
    r = self.request(
        ^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/dropbox/dropbox_client.py", line 326, in request
    res = self.request_json_string_with_retry(host,
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/dropbox/dropbox_client.py", line 476, in request_json_string_with_retry
    return self.request_json_string(host,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3/dist-packages/dropbox/dropbox_client.py", line 596, in request_json_string
    self.raise_dropbox_error_for_resp(r)
  File "/usr/lib/python3/dist-packages/dropbox/dropbox_client.py", line 639, in raise_dropbox_error_for_resp
    raise AuthError(request_id, err)
dropbox.exceptions.AuthError: AuthError('254810fb73a442618467ce79e1541cdb', AuthError('expired_access_token', None))

@donrestarone
Copy link
Author

donrestarone commented Feb 2, 2025

previous recording command:
translates to: ffmpeg -f v4l2 -framerate 30 -i /dev/video0 -vcodec libx264 -frames 100 test.mp4

                        subprocess.call([
                            'ffmpeg',
                            '-f', 'v4l2',
                            '-framerate', '30',
                            '-i', '/dev/video0',
                            '-vcodec', 'libx264',
                            '-frames', '100',
                            # ENSURE THIS PATH EXISTS!
                            video_file_path,
                        ], timeout=capture_process_timeout)

to record audio and video at the same time in ffmpeg:

ffmpeg -ar 44100 -ac 1 -f alsa -i hw:2 -f v4l2 -c:v h264 -r 30 -s 1920x1080 -itsoffset 0.5 -i /dev/video0 -t 15 -copyinkf -codec:v copy -codec:a aac -ab 128k -g 10 test.mp4

adapted to existing bash command:

ffmpeg -ar 44100 -ac 1 -f alsa -i hw:2 -f v4l2 -framerate 30 -i /dev/video0 -vcodec libx264 -frames 100 test.mp4

note hw:2 is the hardware number from lsusb

eg:

Bus 004 Device 001: ID 1d6b:0003 Linux Foundation 3.0 root hub
Bus 003 Device 002: ID 08bb:2902 Texas Instruments PCM2902 Audio Codec
Bus 003 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub
Bus 002 Device 002: ID 1532:0e05 Razer USA, Ltd Razer Kiyo Pro
Bus 002 Device 001: ID 1d6b:0003 Linux Foundation 3.0 root hub
Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub

source: https://raspberrypi.stackexchange.com/questions/49946/best-ffmpeg-configuration-for-recording-videoaudio-with-raspberrypi-and-usb-web

@donrestarone
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment