Skip to content

Instantly share code, notes, and snippets.

@TonsOfFun
Created May 14, 2018 16:17
Show Gist options
  • Save TonsOfFun/56bd2ad0e26d6320f621f6a60ebf3d63 to your computer and use it in GitHub Desktop.
Save TonsOfFun/56bd2ad0e26d6320f621f6a60ebf3d63 to your computer and use it in GitHub Desktop.
Crypto Smart Camera POC
version: '3'
services:
miner:
image: cryptotrust/rpi-cpuminer-multi
command: "cpuminer -u ${EMAIL} -a ${ALGO} -o stratum+tcp://${URL}:${PORT} -t ${THREADS}"
camera:
image: ricklon/rpi-opencv
command: python motion_capture.py
volumes:
- .:/greenthumb-rails
import imutils
def motion(current_frame=None, previous_frame=None, avg=None, min_area=2000):
motion = False
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
# if the average frame is None, initialize it
if avg is None:
log.info("Initialising average frame")
avg = gray.copy().astype("float")
raw_capture.truncate(0)
continue
# accumulate the weighted average between the current frame and
# previous frames, then compute the difference between the current
# frame and running average
cv2.accumulateWeighted(gray, avg, 0.5)
frame_delta = cv2.absdiff(gray, cv2.convertScaleAbs(avg))
# threshold the delta image, dilate the thresholded image to fill
# in holes, then find contours on thresholded image
thresh = cv2.threshold(frame_delta, args.delta_threshold, 255, cv2.THRESH_BINARY)[1]
thresh = cv2.dilate(thresh, None, iterations=2)
(contours, _) = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for c in contours:
# if the contour is too small, ignore it
if cv2.contourArea(c) < min_area:
continue
motion = True
log.info("Motion detected")
return motion
import io
import random
import picamera
from PIL import Image
from
import json
import re
import requests
global previous_frame
global current_frame
global last_capture_at
global avg
def upload_capture():
if current_frame is not None:
last_capture_at = time.time()
Note(host='http://greenthumb-proto.herokuapp.com',
title='This is an automated upload',
frame=current_frame,
profile=profile,
user_email='[email protected]',
user_token='sometokenhere').create_note()
# Called once in the while
# Simple way to capture an image only once per minute
def periodic_capture():
timeout = 60.0 # Sixty seconds
l = task.LoopingCall(upload_capture())
l.start(timeout) # call every sixty seconds
reactor.run()
def detect_motion(camera):
stream
camera.capture(stream, format='jpeg', use_video_port=True)
stream.seek(0)
if previous_frame is None:
previous_frame = Image.open(stream)
return False
else:
current_frame = Image.open(stream)
result = motion(current_frame, previous_frame)
previous_frame = current_frame
return result
with picamera.PiCamera() as camera:
camera.resolution = (1920, 1080)
stream = picamera.PiCameraCircularIO(camera, seconds=10)
try:
while True:
camera.wait_recording(1)
periodic_capture()
if detect_motion(camera):
print('Motion detected!')
# As soon as we detect motion, split the recording to
# record the frames "after" motion
camera.split_recording(Note())
# Write the 10 seconds "before" motion to disk as well
stream.copy_to(Note(), seconds=10)
stream.clear()
# Wait until motion is no longer detected, then split
# recording back to the in-memory circular buffer
while detect_motion(camera):
camera.wait_recording(1)
print('Motion stopped!')
camera.split_recording(stream)
finally:
camera.stop_recording()
import json
import re
import requests
class Note(object):
def __init__(self,
host=None,
title='',
body='',
profile='',
frame=BytesIO(),
frames=BytesIO(),
user_email=None,
user_token=None):
self.profile = profile
self.host = host
self.title = title
self.body = body
self.frame = frame
self.frames = frames
self.user_email = user_email
self.user_token = user_token
# http://picamera.readthedocs.io/en/release-1.12/recipes2.html#custom-outputs
def write(self, stream):
self.frames += stream
def flush(self):
self.create_note()
def get_presigned_post(self):
get_presigned_post_url = self.host +
'/notes/new.json?user_email=' + self.user_email +
'&user_token=' + self.user_token
response = requests.get(get_presigned_post_url)
presigned_post_json = json.loads(response.text)
form_data = presigned_post_json['s3']['form-data']
url = presigned_post_json['s3']['url']
return url, form_data
def form_object_files(self):
file_data = self.frame.getvalue()
if(len(frame) > 0):
filename = 'test.jpg'
mime_type = 'image/jpeg'
else:
file_data = self.frames.getvalue()
filename = 'test.mp4'
mime_type = 'video/mp4'
{ 'file': (filename, file_data, mime_type) }
def post_file(self, url, form_data):
response = requests.post(url, files=self.form_object_files(), data=form_data)
url_re = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
return re.findall(url_re, response.text)[0].split('<')[0]
def create_note(self, file_url=None, profile=''):
print("[INFO] getting presigned request...")
url, form_data = self.get_presigned_post()
print("[INFO] uploading file...")
file_url = self.post_file(url, form_data)
form_data = {
'note[title]': self.title,
'note[body]': self.body + ' ' + profile,
'note[file_url]': file_url,
'user_email': self.user_email,
'user_token': self.user_token
}
print("[INFO] creating note...")
response = requests.post(self.host + '/notes', data=form_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment