-
-
Save donrestarone/527293a0a8779dcfbb0311b3add6e78e to your computer and use it in GitHub Desktop.
raspberry pi infrared sensor activated LED 'security light' - custom
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
######################################################################## | |
# Filename : SenseLED.py | |
# Description : Control led with infrared Motion sensor. | |
# auther : www.freenove.com | |
# modification: @donrestarone | |
######################################################################## | |
from gpiozero import LED,MotionSensor | |
import time | |
import datetime | |
from datetime import timezone | |
import subprocess | |
import psutil | |
import os | |
import requests | |
import json | |
import pathlib | |
import dropbox | |
timestamp = datetime.datetime.now() | |
ledPin = 18 # define ledPin | |
sensorPin = 17 # define sensorPin | |
led = LED(ledPin) | |
sensor = MotionSensor(sensorPin) | |
sensor.wait_for_no_motion() | |
print("[INFO] booting sensor system, please stand by.. %s" % (timestamp)) | |
api_key = os.environ['MONITORING_API_KEY'] | |
api_base_url = os.environ['MONITORING_API_ENDPOINT'] | |
this_client_id = os.environ['MONITORING_CLIENT_ID'] | |
client = None | |
motion_message = None | |
video_file_path = None | |
def loop(): | |
# Variables to hold the current and last states | |
currentstate = False | |
previousstate = False | |
timestamp = datetime.datetime.now() | |
capture_process_timeout = 60 # in seconds | |
print("[INFO] booted sensor system, OK %s" % (timestamp)) | |
is_currently_capturing_video = False | |
unix_timestamp = timestamp.timestamp() | |
timestamp = datetime.datetime.now() | |
report_json = { | |
'evidence': '[INFO] system booted', | |
'triggered_by': 'system', | |
'evidence_format': 'string', | |
'evidence_metadata': { | |
'timestamp_unix': str(unix_timestamp), | |
'timestamp_local': str(timestamp) | |
}, | |
} | |
# initialize client <> server connection | |
client = get_client_configuration() | |
perform_client_health_check(client, timestamp) | |
ping_dynamic_dns(client) | |
perform_incident_reporting(client, timestamp, report_json) | |
try: | |
# attempt to free up dropbox space once per boot | |
detect_and_capture_dropbox_freeable_space(client) | |
except Exception as e: | |
print('[REMOTE ERROR] could not connect to dropbox due to: %s' %(e)) | |
# clear violet rails space instead | |
while True: | |
# make sure to not run out of storage | |
detect_and_capture_freeable_space() | |
# Read sensor state | |
currentstate = sensor.motion_detected | |
client = get_client_configuration() | |
perform_client_health_check(client, timestamp) | |
# If the sensor is triggered | |
if currentstate == True and previousstate == False: | |
led.on() | |
timestamp = datetime.datetime.now() | |
client_enabled = client['attributes']['properties']['enabled'] if client != False else False | |
client_armed = client['attributes']['properties']['armed'] if client != False else False | |
if client_enabled and client_armed: | |
ensure_video_camera_initialized() | |
else: | |
print('[INFO] remote monitoring not enabled') | |
motion_message = "[ALERT] Motion Detected at %s" % (timestamp) | |
print(motion_message) | |
if client_enabled and client_armed: | |
# hooks | |
print("[INFO] enabled and armed, running hooks at %s" % (timestamp)) | |
unix_timestamp = timestamp.timestamp() | |
video_file_path = '/home/donrestarone/motion_captures/motion-capture%03d.mp4' % unix_timestamp | |
# ensure that there is a mic connected here via `lsusb` | |
mic_hardware_address = 'hw:2' | |
try: | |
if is_currently_capturing_video == False: | |
# take video | |
print('[INFO] capturing video file: %s' % video_file_path) | |
is_currently_capturing_video = True | |
subprocess.call([ | |
'ffmpeg', | |
'-ar', '44100', | |
'-ac', '1', | |
'-f', 'alsa', | |
'-i', mic_hardware_address, | |
'-f', 'v4l2', | |
'-c:v', 'h264', | |
'-r', '30', | |
'-s', '1920x1080', | |
'-itsoffset', '0.5', | |
'-i', '/dev/video0', | |
'-t', '5', | |
'-copyinkf', | |
'-codec:v', 'copy', | |
'-codec:a', 'aac', | |
'-ab', '128k', | |
'-g', '10', | |
# ENSURE THIS PATH EXISTS! | |
video_file_path, | |
], timeout=capture_process_timeout) | |
print('[INFO] captured video file OK: %s' % video_file_path) | |
is_currently_capturing_video = False | |
except Exception as e: | |
is_currently_capturing_video = False | |
print('[ERROR] saving video file: %s failed due to: %s' %(video_file_path, e)) | |
os.system("sudo reboot") | |
# post incident | |
try: | |
report_json = { | |
'evidence': motion_message, | |
'triggered_by': 'infrared-motion', | |
'evidence_format': 'string', | |
'evidence_metadata': { | |
'timestamp_unix': str(unix_timestamp), | |
'timestamp_local': str(timestamp) | |
}, | |
} | |
perform_incident_reporting(client, timestamp, report_json) | |
try: | |
upload_to_dropbox(client, timestamp, report_json, video_file_path) | |
except Exception as e: | |
print('[REMOTE ERROR] could not upload to dropbox due to: %s' %(e)) | |
# send to violet rails instead | |
upload_video_file_to_violet_rails(client, timestamp, video_file_path) | |
except Exception as e: | |
print('[REMOTE ERROR] could not report due to: %s' %(e)) | |
os.system("sudo reboot") | |
# Record previous state | |
previousstate = True | |
# If the sensor has returned to ready state | |
elif currentstate == False and previousstate == True: | |
led.off() | |
timestamp = datetime.datetime.now() | |
print("[INFO] No Motion Detected at %s" % (timestamp)) | |
previousstate = False | |
# Wait for 10 milliseconds | |
time.sleep(0.01) | |
def destroy(): | |
led.close() | |
sensor.close() | |
def ensure_video_camera_initialized(): | |
# ENSURE THIS IS THE CAMERA MODEL, OTHERWISE THE SYSTEM WILL BOOTLOOP! - INTRODUCED TO REBOOT IN CASE CAMERA HARDWARE CRASHES | |
camera_model = "Razer Kiyo Pro" | |
# this fails loudly if Cannot open device /dev/video0, exiting - so not having any camera plugged-in will not cause a bootloop | |
if camera_model in str(subprocess.check_output(['v4l2-ctl', '--list-devices'])): | |
print("[INFO] camera OK") | |
else: | |
os.system("sudo reboot") | |
def detect_and_capture_freeable_space(): | |
hdd = psutil.disk_usage('/') | |
if hdd.percent > 90: | |
print("[WARN] running out of storage, freeing space now") | |
subprocess.call([ | |
'rm', | |
# ENSURE THIS PATH EXISTS! | |
'/home/donrestarone/motion_captures/*', | |
]) | |
print("[INFO] storage space cleared in /home/donrestarone/motion_captures/* - OK") | |
# server connection code | |
def get_client_configuration(): | |
try: | |
# TODO fix security vulnerability - this endpoint returns all monitoring clients, change violet rails code to allow for scoping by api_key_only | |
monitoring_clients = requests.get('%s/api/1/monitoring_clients' % (api_base_url), headers={"Authorization": 'Bearer %s' % (api_key) }) | |
clients_struct = json.loads(monitoring_clients.content)['data'] | |
# filter to correct client, again this allows for each client to access clients they are not supposed to! | |
this_client = list(filter(lambda c: c['attributes']['properties']['client_uuid'] == this_client_id, clients_struct))[0] | |
# handle system shut off, while enabled: false the system will bootloop waiting for a wake signal! | |
if this_client['attributes']['properties']['enabled'] == False: | |
print("[REMOTE] client disabled, rebooting") | |
os.system("sudo reboot") | |
else: | |
return this_client | |
except Exception as e: | |
print('[REMOTE ERROR] could not get client configuration: %s' %(e)) | |
return False | |
def perform_client_health_check(this_client, timestamp): | |
if this_client == False: | |
return False | |
id = this_client['id'] | |
endpoint = '%s/api/1/monitoring_clients/edit/%s' % (api_base_url, id) | |
# construct json object with existing keys because violet rails clobbers existing keys on update TODO fix this behavior in the server | |
attributes_json = this_client['attributes']['properties'] | |
attributes_json['last_health_check'] = str(timestamp) | |
if this_client['attributes']['properties']['enabled'] == False: | |
return False | |
try: | |
response = requests.patch(endpoint, json={'data': attributes_json}, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
if response.status_code == 200: | |
print("[REMOTE] performed client health check at %s" % (timestamp)) | |
except Exception as e: | |
print('[REMOTE ERROR] could not perform client health check: %s' %(e)) | |
def perform_incident_reporting(this_client, timestamp, report_json): | |
id = this_client['id'] | |
schema_endpoint = '%s/api/1/monitoring_incidents/describe' %(api_base_url) | |
# allows reporting schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter | |
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
json_schema = json.loads(schema_response.content)['data']['attributes']['properties'] | |
# write incident to latest schema | |
json_schema['monitoring_client_id'] = id | |
json_schema['evidence'] = report_json['evidence'] | |
json_schema['triggered_by'] = report_json['triggered_by'] | |
json_schema['evidence_format'] = report_json['evidence_format'] | |
json_schema['evidence_metadata'] = report_json['evidence_metadata'] | |
endpoint = '%s/api/1/monitoring_incidents' %(api_base_url) | |
incident_response = requests.post(endpoint, json={'data': json_schema}, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
print("[REMOTE] performed incident reporting at %s" % (timestamp)) | |
def upload_to_dropbox(this_client, timestamp, report_json, video_file_path): | |
dropbox_access_token = this_client['attributes']['properties']['dropbox_api_key'] | |
app_key = this_client['attributes']['properties']['dropbox_app_key'] | |
app_secret = this_client['attributes']['properties']['dropbox_app_secret'] | |
filepath = pathlib.Path(video_file_path) | |
file_name = video_file_path.split('/')[-1] | |
targetfile = '/' + app_key + '/' + file_name | |
d = dropbox.Dropbox(oauth2_access_token=dropbox_access_token, app_key=app_key, app_secret=app_secret) | |
with filepath.open("rb") as f: | |
print('[INFO] uploading video: %s to dropbox' %(file_name)) | |
meta = d.files_upload(f.read(), targetfile, mode=dropbox.files.WriteMode("overwrite")) | |
print('[INFO] completed uploading video: %s to dropbox' %(file_name)) | |
def detect_and_capture_dropbox_freeable_space(this_client): | |
dropbox_access_token = this_client['attributes']['properties']['dropbox_api_key'] | |
app_key = this_client['attributes']['properties']['dropbox_app_key'] | |
app_secret = this_client['attributes']['properties']['dropbox_app_secret'] | |
dbx = dropbox.Dropbox(oauth2_access_token=dropbox_access_token, app_key=app_key, app_secret=app_secret) | |
allocation = dbx.users_get_space_usage().allocation | |
currently_allocated_bytes = 0 | |
minimum_bytes = 32000000 # 32MB | |
currently_used_storage_bytes = dbx.users_get_space_usage().used | |
if allocation.is_individual(): | |
currently_allocated_bytes = allocation.get_individual().allocated | |
elif allocation.is_team(): | |
currently_allocated_bytes = allocation.get_team().allocated | |
if (currently_allocated_bytes - currently_used_storage_bytes) < minimum_bytes: | |
print('[REMOTE] dropbox storage minimums reached, purging old files') | |
wildcard_path = '/' + app_key | |
dbx.files_delete_v2(wildcard_path) | |
def ping_dynamic_dns(this_client): | |
id = this_client['id'] | |
uuid = this_client['attributes']['properties']['client_uuid'] | |
webhook_authorization_key = uuid + '==' + api_key | |
endpoint = '%s/api/1/monitoring_client_dynamic_dns_lookups/dynamic_dns_lookup/webhook' % (api_base_url) | |
ips = subprocess.check_output(['hostname', '--all-ip-addresses']) | |
schema_endpoint = '%s/api/1/monitoring_client_dynamic_dns_lookups/describe' %(api_base_url) | |
# allows ddns schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter | |
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
json_schema = json.loads(schema_response.content)['data']['attributes']['properties'] | |
# write incident to latest schema | |
json_schema['monitoring_client_id'] = id | |
json_schema['ip'] = str(ips) | |
print(json_schema, 'dns ping') | |
print("[INFO] performing DDNS ping") | |
try: | |
response = requests.post(endpoint, json=json_schema, headers={"Authorization": webhook_authorization_key }) | |
if response.status_code == 200: | |
print("[REMOTE] performed DDNS ping") | |
except Exception as e: | |
print('[REMOTE ERROR] could not perform DDNS ping: %s' %(e)) | |
def upload_video_file_to_violet_rails(this_client, timestamp, video_file_path): | |
id = this_client['id'] | |
uuid = this_client['attributes']['properties']['client_uuid'] | |
webhook_authorization_key = uuid + '==' + api_key | |
endpoint = '%s/api/1/monitoring_incidents/video_upload/webhook' % (api_base_url) | |
schema_endpoint = '%s/api/1/monitoring_incidents/describe' %(api_base_url) | |
# allows ddns schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter | |
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
json_schema = json.loads(schema_response.content)['data']['attributes']['properties'] | |
# write incident to latest schema | |
json_schema['monitoring_client_id'] = id | |
json_schema['evidence'] = 'see video file' | |
json_schema['triggered_by'] = 'infrared-motion' | |
json_schema['evidence_format'] = 'file' | |
json_schema['local_file_name'] = video_file_path | |
json_schema['local_time_of_recording'] = str(timestamp) | |
print("[INFO] uploading evidence file to violet rails") | |
try: | |
filepath = pathlib.Path(video_file_path) | |
files = {'file': open(video_file_path,'rb')} | |
file_name = video_file_path.split('/')[-1] | |
response = requests.post(endpoint, files=files, data=json_schema, headers={"Authorization": webhook_authorization_key }) | |
print(response.content, 'VR upload') | |
if response.status_code == 200: | |
print("[INFO] uploaded evidence file to violet rails") | |
except Exception as e: | |
print('[REMOTE ERROR] could not upload evidence file to violet rails: %s' %(e)) | |
if __name__ == '__main__': # Program entrance | |
print ('[INFO] Program is starting...') | |
try: | |
loop() | |
except KeyboardInterrupt: # Press ctrl-c to end the program. | |
destroy() | |
print("[INFO] Ending program") |
improve dropbox connection: https://developers.dropbox.com/oauth-guide
[REMOTE] performed incident reporting at 2025-01-26 09:21:21.410273
Unable to refresh access token without refresh token and app key
Traceback (most recent call last):
File "/home/donrestarone/monitoring.py", line 283, in <module>
loop()
File "/home/donrestarone/monitoring.py", line 64, in loop
detect_and_capture_dropbox_freeable_space(client)
File "/home/donrestarone/monitoring.py", line 244, in detect_and_capture_dropbox_freeable_space
allocation = dbx.users_get_space_usage().allocation
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3/dist-packages/dropbox/base.py", line 5855, in users_get_space_usage
r = self.request(
^^^^^^^^^^^^^
File "/usr/lib/python3/dist-packages/dropbox/dropbox_client.py", line 326, in request
res = self.request_json_string_with_retry(host,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3/dist-packages/dropbox/dropbox_client.py", line 476, in request_json_string_with_retry
return self.request_json_string(host,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3/dist-packages/dropbox/dropbox_client.py", line 596, in request_json_string
self.raise_dropbox_error_for_resp(r)
File "/usr/lib/python3/dist-packages/dropbox/dropbox_client.py", line 639, in raise_dropbox_error_for_resp
raise AuthError(request_id, err)
dropbox.exceptions.AuthError: AuthError('254810fb73a442618467ce79e1541cdb', AuthError('expired_access_token', None))
previous recording command:
translates to: ffmpeg -f v4l2 -framerate 30 -i /dev/video0 -vcodec libx264 -frames 100 test.mp4
subprocess.call([
'ffmpeg',
'-f', 'v4l2',
'-framerate', '30',
'-i', '/dev/video0',
'-vcodec', 'libx264',
'-frames', '100',
# ENSURE THIS PATH EXISTS!
video_file_path,
], timeout=capture_process_timeout)
to record audio and video at the same time in ffmpeg:
ffmpeg -ar 44100 -ac 1 -f alsa -i hw:2 -f v4l2 -c:v h264 -r 30 -s 1920x1080 -itsoffset 0.5 -i /dev/video0 -t 15 -copyinkf -codec:v copy -codec:a aac -ab 128k -g 10 test.mp4
adapted to existing bash command:
ffmpeg -ar 44100 -ac 1 -f alsa -i hw:2 -f v4l2 -framerate 30 -i /dev/video0 -vcodec libx264 -frames 100 test.mp4
note hw:2 is the hardware number from lsusb
eg:
Bus 004 Device 001: ID 1d6b:0003 Linux Foundation 3.0 root hub
Bus 003 Device 002: ID 08bb:2902 Texas Instruments PCM2902 Audio Codec
Bus 003 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub
Bus 002 Device 002: ID 1532:0e05 Razer USA, Ltd Razer Kiyo Pro
Bus 002 Device 001: ID 1d6b:0003 Linux Foundation 3.0 root hub
Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
readme
original script: https://gist.github.com/donrestarone/e2211040b94e0072e078d5d8ad201c65
first start
dropbox setup!
to ensure dropbox file upload works, create an app and use the App key, App secret, and Generated access token (oauth2 API key) that is given there.
sudo apt install python3-dropbox
want a display?
install the Freenove LCD and follow setup guide here: https://gist.github.com/donrestarone/812dcd467b0df8af6a13f398156c3a3b
server connection:
ensure that API endpoint, client API key and client ID are in the raspberry pi environment
motion_captures
folder exists at the~
level. eg:/home/donrestarone/motion_captures
sudo apt-get install tmux
~
level create bash script namedmonitoring.sh
(dont forget tochmod +x monitoring.sh
to make it executable) that starts tmux and kicks off the python monitoring service (monitoring.py
)~
level copy paste the python source above to the python scriptmonitoring.py
monitoring.service
exists at/etc/systemd/system/monitoring.service
monitoring.service
by running:sudo systemctl start monitoring.service && sudo systemctl enable monitoring.service
monitoring.sh
monitoring.py
see here: https://gist.github.com/donrestarone/527293a0a8779dcfbb0311b3add6e78e#file-infraredsenseled-py
monitoring.service
sudo systemctl start monitoring.service && sudo systemctl enable monitoring.service
copying captured images over to your local computer
tuning: