Skip to content

Instantly share code, notes, and snippets.

@donrestarone
Last active February 3, 2025 23:28
Show Gist options
  • Save donrestarone/527293a0a8779dcfbb0311b3add6e78e to your computer and use it in GitHub Desktop.
Save donrestarone/527293a0a8779dcfbb0311b3add6e78e to your computer and use it in GitHub Desktop.
raspberry pi infrared sensor activated LED 'security light' - custom
#!/usr/bin/env python3
########################################################################
# Filename : SenseLED.py
# Description : Control led with infrared Motion sensor.
# auther : www.freenove.com
# modification: @donrestarone
########################################################################
from gpiozero import LED,MotionSensor
import time
import datetime
from datetime import timezone
import subprocess
import psutil
import os
import requests
import json
import pathlib
import dropbox
timestamp = datetime.datetime.now()
ledPin = 18 # define ledPin
sensorPin = 17 # define sensorPin
led = LED(ledPin)
sensor = MotionSensor(sensorPin)
sensor.wait_for_no_motion()
print("[INFO] booting sensor system, please stand by.. %s" % (timestamp))
api_key = os.environ['MONITORING_API_KEY']
api_base_url = os.environ['MONITORING_API_ENDPOINT']
this_client_id = os.environ['MONITORING_CLIENT_ID']
client = None
motion_message = None
video_file_path = None
def loop():
# Variables to hold the current and last states
currentstate = False
previousstate = False
timestamp = datetime.datetime.now()
capture_process_timeout = 60 # in seconds
print("[INFO] booted sensor system, OK %s" % (timestamp))
is_currently_capturing_video = False
unix_timestamp = timestamp.timestamp()
timestamp = datetime.datetime.now()
report_json = {
'evidence': '[INFO] system booted',
'triggered_by': 'system',
'evidence_format': 'string',
'evidence_metadata': {
'timestamp_unix': str(unix_timestamp),
'timestamp_local': str(timestamp)
},
}
# initialize client <> server connection
client = get_client_configuration()
perform_client_health_check(client, timestamp)
ping_dynamic_dns(client)
perform_incident_reporting(client, timestamp, report_json)
try:
# attempt to free up dropbox space once per boot
detect_and_capture_dropbox_freeable_space(client)
except Exception as e:
print('[REMOTE ERROR] could not connect to dropbox due to: %s' %(e))
# clear violet rails space instead
while True:
# make sure to not run out of storage
detect_and_capture_freeable_space()
# Read sensor state
currentstate = sensor.motion_detected
client = get_client_configuration()
perform_client_health_check(client, timestamp)
# If the sensor is triggered
if currentstate == True and previousstate == False:
led.on()
timestamp = datetime.datetime.now()
client_enabled = client['attributes']['properties']['enabled'] if client != False else False
client_armed = client['attributes']['properties']['armed'] if client != False else False
if client_enabled and client_armed:
ensure_video_camera_initialized()
else:
print('[INFO] remote monitoring not enabled')
motion_message = "[ALERT] Motion Detected at %s" % (timestamp)
print(motion_message)
if client_enabled and client_armed:
# hooks
print("[INFO] enabled and armed, running hooks at %s" % (timestamp))
unix_timestamp = timestamp.timestamp()
video_file_path = '/home/donrestarone/motion_captures/motion-capture%03d.mp4' % unix_timestamp
# ensure that there is a mic connected here via `lsusb`
mic_hardware_address = 'hw:2'
try:
if is_currently_capturing_video == False:
# take video
print('[INFO] capturing video file: %s' % video_file_path)
is_currently_capturing_video = True
subprocess.call([
'ffmpeg',
'-ar', '44100',
'-ac', '1',
'-f', 'alsa',
'-i', mic_hardware_address,
'-f', 'v4l2',
'-c:v', 'h264',
'-r', '30',
'-s', '1920x1080',
'-itsoffset', '0.5',
'-i', '/dev/video0',
'-t', '5',
'-copyinkf',
'-codec:v', 'copy',
'-codec:a', 'aac',
'-ab', '128k',
'-g', '10',
# ENSURE THIS PATH EXISTS!
video_file_path,
], timeout=capture_process_timeout)
print('[INFO] captured video file OK: %s' % video_file_path)
is_currently_capturing_video = False
except Exception as e:
is_currently_capturing_video = False
print('[ERROR] saving video file: %s failed due to: %s' %(video_file_path, e))
os.system("sudo reboot")
# post incident
try:
report_json = {
'evidence': motion_message,
'triggered_by': 'infrared-motion',
'evidence_format': 'string',
'evidence_metadata': {
'timestamp_unix': str(unix_timestamp),
'timestamp_local': str(timestamp)
},
}
perform_incident_reporting(client, timestamp, report_json)
try:
upload_to_dropbox(client, timestamp, report_json, video_file_path)
except Exception as e:
print('[REMOTE ERROR] could not upload to dropbox due to: %s' %(e))
# send to violet rails instead
upload_video_file_to_violet_rails(client, timestamp, video_file_path)
except Exception as e:
print('[REMOTE ERROR] could not report due to: %s' %(e))
os.system("sudo reboot")
# Record previous state
previousstate = True
# If the sensor has returned to ready state
elif currentstate == False and previousstate == True:
led.off()
timestamp = datetime.datetime.now()
print("[INFO] No Motion Detected at %s" % (timestamp))
previousstate = False
# Wait for 10 milliseconds
time.sleep(0.01)
def destroy():
led.close()
sensor.close()
def ensure_video_camera_initialized():
# ENSURE THIS IS THE CAMERA MODEL, OTHERWISE THE SYSTEM WILL BOOTLOOP! - INTRODUCED TO REBOOT IN CASE CAMERA HARDWARE CRASHES
camera_model = "Razer Kiyo Pro"
# this fails loudly if Cannot open device /dev/video0, exiting - so not having any camera plugged-in will not cause a bootloop
if camera_model in str(subprocess.check_output(['v4l2-ctl', '--list-devices'])):
print("[INFO] camera OK")
else:
os.system("sudo reboot")
def detect_and_capture_freeable_space():
hdd = psutil.disk_usage('/')
if hdd.percent > 90:
print("[WARN] running out of storage, freeing space now")
subprocess.call([
'rm',
# ENSURE THIS PATH EXISTS!
'/home/donrestarone/motion_captures/*',
])
print("[INFO] storage space cleared in /home/donrestarone/motion_captures/* - OK")
# server connection code
def get_client_configuration():
try:
# TODO fix security vulnerability - this endpoint returns all monitoring clients, change violet rails code to allow for scoping by api_key_only
monitoring_clients = requests.get('%s/api/1/monitoring_clients' % (api_base_url), headers={"Authorization": 'Bearer %s' % (api_key) })
clients_struct = json.loads(monitoring_clients.content)['data']
# filter to correct client, again this allows for each client to access clients they are not supposed to!
this_client = list(filter(lambda c: c['attributes']['properties']['client_uuid'] == this_client_id, clients_struct))[0]
# handle system shut off, while enabled: false the system will bootloop waiting for a wake signal!
if this_client['attributes']['properties']['enabled'] == False:
print("[REMOTE] client disabled, rebooting")
os.system("sudo reboot")
else:
return this_client
except Exception as e:
print('[REMOTE ERROR] could not get client configuration: %s' %(e))
return False
def perform_client_health_check(this_client, timestamp):
if this_client == False:
return False
id = this_client['id']
endpoint = '%s/api/1/monitoring_clients/edit/%s' % (api_base_url, id)
# construct json object with existing keys because violet rails clobbers existing keys on update TODO fix this behavior in the server
attributes_json = this_client['attributes']['properties']
attributes_json['last_health_check'] = str(timestamp)
if this_client['attributes']['properties']['enabled'] == False:
return False
try:
response = requests.patch(endpoint, json={'data': attributes_json}, headers={"Authorization": 'Bearer %s' % (api_key) })
if response.status_code == 200:
print("[REMOTE] performed client health check at %s" % (timestamp))
except Exception as e:
print('[REMOTE ERROR] could not perform client health check: %s' %(e))
def perform_incident_reporting(this_client, timestamp, report_json):
id = this_client['id']
schema_endpoint = '%s/api/1/monitoring_incidents/describe' %(api_base_url)
# allows reporting schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) })
json_schema = json.loads(schema_response.content)['data']['attributes']['properties']
# write incident to latest schema
json_schema['monitoring_client_id'] = id
json_schema['evidence'] = report_json['evidence']
json_schema['triggered_by'] = report_json['triggered_by']
json_schema['evidence_format'] = report_json['evidence_format']
json_schema['evidence_metadata'] = report_json['evidence_metadata']
endpoint = '%s/api/1/monitoring_incidents' %(api_base_url)
incident_response = requests.post(endpoint, json={'data': json_schema}, headers={"Authorization": 'Bearer %s' % (api_key) })
print("[REMOTE] performed incident reporting at %s" % (timestamp))
def upload_to_dropbox(this_client, timestamp, report_json, video_file_path):
dropbox_access_token = this_client['attributes']['properties']['dropbox_api_key']
app_key = this_client['attributes']['properties']['dropbox_app_key']
app_secret = this_client['attributes']['properties']['dropbox_app_secret']
filepath = pathlib.Path(video_file_path)
file_name = video_file_path.split('/')[-1]
targetfile = '/' + app_key + '/' + file_name
d = dropbox.Dropbox(oauth2_access_token=dropbox_access_token, app_key=app_key, app_secret=app_secret)
with filepath.open("rb") as f:
print('[INFO] uploading video: %s to dropbox' %(file_name))
meta = d.files_upload(f.read(), targetfile, mode=dropbox.files.WriteMode("overwrite"))
print('[INFO] completed uploading video: %s to dropbox' %(file_name))
def detect_and_capture_dropbox_freeable_space(this_client):
dropbox_access_token = this_client['attributes']['properties']['dropbox_api_key']
app_key = this_client['attributes']['properties']['dropbox_app_key']
app_secret = this_client['attributes']['properties']['dropbox_app_secret']
dbx = dropbox.Dropbox(oauth2_access_token=dropbox_access_token, app_key=app_key, app_secret=app_secret)
allocation = dbx.users_get_space_usage().allocation
currently_allocated_bytes = 0
minimum_bytes = 32000000 # 32MB
currently_used_storage_bytes = dbx.users_get_space_usage().used
if allocation.is_individual():
currently_allocated_bytes = allocation.get_individual().allocated
elif allocation.is_team():
currently_allocated_bytes = allocation.get_team().allocated
if (currently_allocated_bytes - currently_used_storage_bytes) < minimum_bytes:
print('[REMOTE] dropbox storage minimums reached, purging old files')
wildcard_path = '/' + app_key
dbx.files_delete_v2(wildcard_path)
def ping_dynamic_dns(this_client):
id = this_client['id']
uuid = this_client['attributes']['properties']['client_uuid']
webhook_authorization_key = uuid + '==' + api_key
endpoint = '%s/api/1/monitoring_client_dynamic_dns_lookups/dynamic_dns_lookup/webhook' % (api_base_url)
ips = subprocess.check_output(['hostname', '--all-ip-addresses'])
schema_endpoint = '%s/api/1/monitoring_client_dynamic_dns_lookups/describe' %(api_base_url)
# allows ddns schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) })
json_schema = json.loads(schema_response.content)['data']['attributes']['properties']
# write incident to latest schema
json_schema['monitoring_client_id'] = id
json_schema['ip'] = str(ips)
print(json_schema, 'dns ping')
print("[INFO] performing DDNS ping")
try:
response = requests.post(endpoint, json=json_schema, headers={"Authorization": webhook_authorization_key })
if response.status_code == 200:
print("[REMOTE] performed DDNS ping")
except Exception as e:
print('[REMOTE ERROR] could not perform DDNS ping: %s' %(e))
def upload_video_file_to_violet_rails(this_client, timestamp, video_file_path):
id = this_client['id']
uuid = this_client['attributes']['properties']['client_uuid']
webhook_authorization_key = uuid + '==' + api_key
endpoint = '%s/api/1/monitoring_incidents/video_upload/webhook' % (api_base_url)
schema_endpoint = '%s/api/1/monitoring_incidents/describe' %(api_base_url)
# allows ddns schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) })
json_schema = json.loads(schema_response.content)['data']['attributes']['properties']
# write incident to latest schema
json_schema['monitoring_client_id'] = id
json_schema['evidence'] = 'see video file'
json_schema['triggered_by'] = 'infrared-motion'
json_schema['evidence_format'] = 'file'
json_schema['local_file_name'] = video_file_path
json_schema['local_time_of_recording'] = str(timestamp)
print("[INFO] uploading evidence file to violet rails")
try:
filepath = pathlib.Path(video_file_path)
files = {'file': open(video_file_path,'rb')}
file_name = video_file_path.split('/')[-1]
response = requests.post(endpoint, files=files, data=json_schema, headers={"Authorization": webhook_authorization_key })
print(response.content, 'VR upload')
if response.status_code == 200:
print("[INFO] uploaded evidence file to violet rails")
except Exception as e:
print('[REMOTE ERROR] could not upload evidence file to violet rails: %s' %(e))
if __name__ == '__main__': # Program entrance
print ('[INFO] Program is starting...')
try:
loop()
except KeyboardInterrupt: # Press ctrl-c to end the program.
destroy()
print("[INFO] Ending program")
@donrestarone
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment