-
-
Save donrestarone/527293a0a8779dcfbb0311b3add6e78e to your computer and use it in GitHub Desktop.
raspberry pi infrared sensor activated LED 'security light' - custom
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
######################################################################## | |
# Filename : SenseLED.py | |
# Description : Control led with infrared Motion sensor. | |
# auther : www.freenove.com | |
# modification: @donrestarone | |
######################################################################## | |
from gpiozero import LED,MotionSensor | |
import time | |
import datetime | |
from datetime import timezone | |
import subprocess | |
import psutil | |
import os | |
import requests | |
import json | |
import pathlib | |
import dropbox | |
timestamp = datetime.datetime.now() | |
ledPin = 18 # define ledPin | |
sensorPin = 17 # define sensorPin | |
led = LED(ledPin) | |
sensor = MotionSensor(sensorPin) | |
sensor.wait_for_no_motion() | |
print("[INFO] booting sensor system, please stand by.. %s" % (timestamp)) | |
api_key = os.environ['MONITORING_API_KEY'] | |
api_base_url = os.environ['MONITORING_API_ENDPOINT'] | |
this_client_id = os.environ['MONITORING_CLIENT_ID'] | |
client = None | |
motion_message = None | |
video_file_path = None | |
def loop(): | |
# Variables to hold the current and last states | |
currentstate = False | |
previousstate = False | |
timestamp = datetime.datetime.now() | |
capture_process_timeout = 60 # in seconds | |
print("[INFO] booted sensor system, OK %s" % (timestamp)) | |
is_currently_capturing_video = False | |
unix_timestamp = timestamp.timestamp() | |
timestamp = datetime.datetime.now() | |
report_json = { | |
'evidence': '[INFO] system booted', | |
'triggered_by': 'system', | |
'evidence_format': 'string', | |
'evidence_metadata': { | |
'timestamp_unix': str(unix_timestamp), | |
'timestamp_local': str(timestamp) | |
}, | |
} | |
# initialize client <> server connection | |
client = get_client_configuration() | |
perform_client_health_check(client, timestamp) | |
ping_dynamic_dns(client) | |
perform_incident_reporting(client, timestamp, report_json) | |
try: | |
# attempt to free up dropbox space once per boot | |
detect_and_capture_dropbox_freeable_space(client) | |
except Exception as e: | |
print('[REMOTE ERROR] could not connect to dropbox due to: %s' %(e)) | |
# clear violet rails space instead | |
while True: | |
# make sure to not run out of storage | |
detect_and_capture_freeable_space() | |
# Read sensor state | |
currentstate = sensor.motion_detected | |
client = get_client_configuration() | |
perform_client_health_check(client, timestamp) | |
# If the sensor is triggered | |
if currentstate == True and previousstate == False: | |
led.on() | |
timestamp = datetime.datetime.now() | |
client_enabled = client['attributes']['properties']['enabled'] if client != False else False | |
client_armed = client['attributes']['properties']['armed'] if client != False else False | |
if client_enabled and client_armed: | |
ensure_video_camera_initialized() | |
else: | |
print('[INFO] remote monitoring not enabled') | |
motion_message = "[ALERT] Motion Detected at %s" % (timestamp) | |
print(motion_message) | |
if client_enabled and client_armed: | |
# hooks | |
print("[INFO] enabled and armed, running hooks at %s" % (timestamp)) | |
unix_timestamp = timestamp.timestamp() | |
video_file_path = '/home/donrestarone/motion_captures/motion-capture%03d.mp4' % unix_timestamp | |
# ensure that there is a mic connected here via `lsusb` | |
mic_hardware_address = 'hw:2' | |
try: | |
if is_currently_capturing_video == False: | |
# take video | |
print('[INFO] capturing video file: %s' % video_file_path) | |
is_currently_capturing_video = True | |
subprocess.call([ | |
'ffmpeg', | |
'-ar', '44100', | |
'-ac', '1', | |
'-f', 'alsa', | |
'-i', mic_hardware_address, | |
'-f', 'v4l2', | |
'-c:v', 'h264', | |
'-r', '30', | |
'-s', '1920x1080', | |
'-itsoffset', '0.5', | |
'-i', '/dev/video0', | |
'-t', '5', | |
'-copyinkf', | |
'-codec:v', 'copy', | |
'-codec:a', 'aac', | |
'-ab', '128k', | |
'-g', '10', | |
# ENSURE THIS PATH EXISTS! | |
video_file_path, | |
], timeout=capture_process_timeout) | |
print('[INFO] captured video file OK: %s' % video_file_path) | |
is_currently_capturing_video = False | |
except Exception as e: | |
is_currently_capturing_video = False | |
print('[ERROR] saving video file: %s failed due to: %s' %(video_file_path, e)) | |
os.system("sudo reboot") | |
# post incident | |
try: | |
report_json = { | |
'evidence': motion_message, | |
'triggered_by': 'infrared-motion', | |
'evidence_format': 'string', | |
'evidence_metadata': { | |
'timestamp_unix': str(unix_timestamp), | |
'timestamp_local': str(timestamp) | |
}, | |
} | |
perform_incident_reporting(client, timestamp, report_json) | |
try: | |
upload_to_dropbox(client, timestamp, report_json, video_file_path) | |
except Exception as e: | |
print('[REMOTE ERROR] could not upload to dropbox due to: %s' %(e)) | |
# send to violet rails instead | |
upload_video_file_to_violet_rails(client, timestamp, video_file_path) | |
except Exception as e: | |
print('[REMOTE ERROR] could not report due to: %s' %(e)) | |
os.system("sudo reboot") | |
# Record previous state | |
previousstate = True | |
# If the sensor has returned to ready state | |
elif currentstate == False and previousstate == True: | |
led.off() | |
timestamp = datetime.datetime.now() | |
print("[INFO] No Motion Detected at %s" % (timestamp)) | |
previousstate = False | |
# Wait for 10 milliseconds | |
time.sleep(0.01) | |
def destroy(): | |
led.close() | |
sensor.close() | |
def ensure_video_camera_initialized(): | |
# ENSURE THIS IS THE CAMERA MODEL, OTHERWISE THE SYSTEM WILL BOOTLOOP! - INTRODUCED TO REBOOT IN CASE CAMERA HARDWARE CRASHES | |
camera_model = "Razer Kiyo Pro" | |
# this fails loudly if Cannot open device /dev/video0, exiting - so not having any camera plugged-in will not cause a bootloop | |
if camera_model in str(subprocess.check_output(['v4l2-ctl', '--list-devices'])): | |
print("[INFO] camera OK") | |
else: | |
os.system("sudo reboot") | |
def detect_and_capture_freeable_space(): | |
hdd = psutil.disk_usage('/') | |
if hdd.percent > 90: | |
print("[WARN] running out of storage, freeing space now") | |
subprocess.call([ | |
'rm', | |
# ENSURE THIS PATH EXISTS! | |
'/home/donrestarone/motion_captures/*', | |
]) | |
print("[INFO] storage space cleared in /home/donrestarone/motion_captures/* - OK") | |
# server connection code | |
def get_client_configuration(): | |
try: | |
# TODO fix security vulnerability - this endpoint returns all monitoring clients, change violet rails code to allow for scoping by api_key_only | |
monitoring_clients = requests.get('%s/api/1/monitoring_clients' % (api_base_url), headers={"Authorization": 'Bearer %s' % (api_key) }) | |
clients_struct = json.loads(monitoring_clients.content)['data'] | |
# filter to correct client, again this allows for each client to access clients they are not supposed to! | |
this_client = list(filter(lambda c: c['attributes']['properties']['client_uuid'] == this_client_id, clients_struct))[0] | |
# handle system shut off, while enabled: false the system will bootloop waiting for a wake signal! | |
if this_client['attributes']['properties']['enabled'] == False: | |
print("[REMOTE] client disabled, rebooting") | |
os.system("sudo reboot") | |
else: | |
return this_client | |
except Exception as e: | |
print('[REMOTE ERROR] could not get client configuration: %s' %(e)) | |
return False | |
def perform_client_health_check(this_client, timestamp): | |
if this_client == False: | |
return False | |
id = this_client['id'] | |
endpoint = '%s/api/1/monitoring_clients/edit/%s' % (api_base_url, id) | |
# construct json object with existing keys because violet rails clobbers existing keys on update TODO fix this behavior in the server | |
attributes_json = this_client['attributes']['properties'] | |
attributes_json['last_health_check'] = str(timestamp) | |
if this_client['attributes']['properties']['enabled'] == False: | |
return False | |
try: | |
response = requests.patch(endpoint, json={'data': attributes_json}, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
if response.status_code == 200: | |
print("[REMOTE] performed client health check at %s" % (timestamp)) | |
except Exception as e: | |
print('[REMOTE ERROR] could not perform client health check: %s' %(e)) | |
def perform_incident_reporting(this_client, timestamp, report_json): | |
id = this_client['id'] | |
schema_endpoint = '%s/api/1/monitoring_incidents/describe' %(api_base_url) | |
# allows reporting schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter | |
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
json_schema = json.loads(schema_response.content)['data']['attributes']['properties'] | |
# write incident to latest schema | |
json_schema['monitoring_client_id'] = id | |
json_schema['evidence'] = report_json['evidence'] | |
json_schema['triggered_by'] = report_json['triggered_by'] | |
json_schema['evidence_format'] = report_json['evidence_format'] | |
json_schema['evidence_metadata'] = report_json['evidence_metadata'] | |
endpoint = '%s/api/1/monitoring_incidents' %(api_base_url) | |
incident_response = requests.post(endpoint, json={'data': json_schema}, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
print("[REMOTE] performed incident reporting at %s" % (timestamp)) | |
def upload_to_dropbox(this_client, timestamp, report_json, video_file_path): | |
dropbox_access_token = this_client['attributes']['properties']['dropbox_api_key'] | |
app_key = this_client['attributes']['properties']['dropbox_app_key'] | |
app_secret = this_client['attributes']['properties']['dropbox_app_secret'] | |
filepath = pathlib.Path(video_file_path) | |
file_name = video_file_path.split('/')[-1] | |
targetfile = '/' + app_key + '/' + file_name | |
d = dropbox.Dropbox(oauth2_access_token=dropbox_access_token, app_key=app_key, app_secret=app_secret) | |
with filepath.open("rb") as f: | |
print('[INFO] uploading video: %s to dropbox' %(file_name)) | |
meta = d.files_upload(f.read(), targetfile, mode=dropbox.files.WriteMode("overwrite")) | |
print('[INFO] completed uploading video: %s to dropbox' %(file_name)) | |
def detect_and_capture_dropbox_freeable_space(this_client): | |
dropbox_access_token = this_client['attributes']['properties']['dropbox_api_key'] | |
app_key = this_client['attributes']['properties']['dropbox_app_key'] | |
app_secret = this_client['attributes']['properties']['dropbox_app_secret'] | |
dbx = dropbox.Dropbox(oauth2_access_token=dropbox_access_token, app_key=app_key, app_secret=app_secret) | |
allocation = dbx.users_get_space_usage().allocation | |
currently_allocated_bytes = 0 | |
minimum_bytes = 32000000 # 32MB | |
currently_used_storage_bytes = dbx.users_get_space_usage().used | |
if allocation.is_individual(): | |
currently_allocated_bytes = allocation.get_individual().allocated | |
elif allocation.is_team(): | |
currently_allocated_bytes = allocation.get_team().allocated | |
if (currently_allocated_bytes - currently_used_storage_bytes) < minimum_bytes: | |
print('[REMOTE] dropbox storage minimums reached, purging old files') | |
wildcard_path = '/' + app_key | |
dbx.files_delete_v2(wildcard_path) | |
def ping_dynamic_dns(this_client): | |
id = this_client['id'] | |
uuid = this_client['attributes']['properties']['client_uuid'] | |
webhook_authorization_key = uuid + '==' + api_key | |
endpoint = '%s/api/1/monitoring_client_dynamic_dns_lookups/dynamic_dns_lookup/webhook' % (api_base_url) | |
ips = subprocess.check_output(['hostname', '--all-ip-addresses']) | |
schema_endpoint = '%s/api/1/monitoring_client_dynamic_dns_lookups/describe' %(api_base_url) | |
# allows ddns schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter | |
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
json_schema = json.loads(schema_response.content)['data']['attributes']['properties'] | |
# write incident to latest schema | |
json_schema['monitoring_client_id'] = id | |
json_schema['ip'] = str(ips) | |
print(json_schema, 'dns ping') | |
print("[INFO] performing DDNS ping") | |
try: | |
response = requests.post(endpoint, json=json_schema, headers={"Authorization": webhook_authorization_key }) | |
if response.status_code == 200: | |
print("[REMOTE] performed DDNS ping") | |
except Exception as e: | |
print('[REMOTE ERROR] could not perform DDNS ping: %s' %(e)) | |
def upload_video_file_to_violet_rails(this_client, timestamp, video_file_path): | |
id = this_client['id'] | |
uuid = this_client['attributes']['properties']['client_uuid'] | |
webhook_authorization_key = uuid + '==' + api_key | |
endpoint = '%s/api/1/monitoring_incidents/video_upload/webhook' % (api_base_url) | |
schema_endpoint = '%s/api/1/monitoring_incidents/describe' %(api_base_url) | |
# allows ddns schema to be dynamic since the JSON structure is loaded from the server and then hydrated with values thereafter | |
schema_response = requests.get(schema_endpoint, headers={"Authorization": 'Bearer %s' % (api_key) }) | |
json_schema = json.loads(schema_response.content)['data']['attributes']['properties'] | |
# write incident to latest schema | |
json_schema['monitoring_client_id'] = id | |
json_schema['evidence'] = 'see video file' | |
json_schema['triggered_by'] = 'infrared-motion' | |
json_schema['evidence_format'] = 'file' | |
json_schema['local_file_name'] = video_file_path | |
json_schema['local_time_of_recording'] = str(timestamp) | |
print("[INFO] uploading evidence file to violet rails") | |
try: | |
filepath = pathlib.Path(video_file_path) | |
files = {'file': open(video_file_path,'rb')} | |
file_name = video_file_path.split('/')[-1] | |
response = requests.post(endpoint, files=files, data=json_schema, headers={"Authorization": webhook_authorization_key }) | |
print(response.content, 'VR upload') | |
if response.status_code == 200: | |
print("[INFO] uploaded evidence file to violet rails") | |
except Exception as e: | |
print('[REMOTE ERROR] could not upload evidence file to violet rails: %s' %(e)) | |
if __name__ == '__main__': # Program entrance | |
print ('[INFO] Program is starting...') | |
try: | |
loop() | |
except KeyboardInterrupt: # Press ctrl-c to end the program. | |
destroy() | |
print("[INFO] Ending program") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
previous recording command:
translates to:
ffmpeg -f v4l2 -framerate 30 -i /dev/video0 -vcodec libx264 -frames 100 test.mp4
to record audio and video at the same time in ffmpeg:
adapted to existing bash command:
note hw:2 is the hardware number from
lsusb
eg:
source: https://raspberrypi.stackexchange.com/questions/49946/best-ffmpeg-configuration-for-recording-videoaudio-with-raspberrypi-and-usb-web