Created
May 14, 2018 16:17
-
-
Save TonsOfFun/56bd2ad0e26d6320f621f6a60ebf3d63 to your computer and use it in GitHub Desktop.
Crypto Smart Camera POC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3' | |
services: | |
miner: | |
image: cryptotrust/rpi-cpuminer-multi | |
command: "cpuminer -u ${EMAIL} -a ${ALGO} -o stratum+tcp://${URL}:${PORT} -t ${THREADS}" | |
camera: | |
image: ricklon/rpi-opencv | |
command: python motion_capture.py | |
volumes: | |
- .:/greenthumb-rails |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import imutils | |
def motion(current_frame=None, previous_frame=None, avg=None, min_area=2000): | |
motion = False | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
gray = cv2.GaussianBlur(gray, (21, 21), 0) | |
# if the average frame is None, initialize it | |
if avg is None: | |
log.info("Initialising average frame") | |
avg = gray.copy().astype("float") | |
raw_capture.truncate(0) | |
continue | |
# accumulate the weighted average between the current frame and | |
# previous frames, then compute the difference between the current | |
# frame and running average | |
cv2.accumulateWeighted(gray, avg, 0.5) | |
frame_delta = cv2.absdiff(gray, cv2.convertScaleAbs(avg)) | |
# threshold the delta image, dilate the thresholded image to fill | |
# in holes, then find contours on thresholded image | |
thresh = cv2.threshold(frame_delta, args.delta_threshold, 255, cv2.THRESH_BINARY)[1] | |
thresh = cv2.dilate(thresh, None, iterations=2) | |
(contours, _) = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
for c in contours: | |
# if the contour is too small, ignore it | |
if cv2.contourArea(c) < min_area: | |
continue | |
motion = True | |
log.info("Motion detected") | |
return motion |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import random | |
import picamera | |
from PIL import Image | |
from | |
import json | |
import re | |
import requests | |
global previous_frame | |
global current_frame | |
global last_capture_at | |
global avg | |
def upload_capture(): | |
if current_frame is not None: | |
last_capture_at = time.time() | |
Note(host='http://greenthumb-proto.herokuapp.com', | |
title='This is an automated upload', | |
frame=current_frame, | |
profile=profile, | |
user_email='[email protected]', | |
user_token='sometokenhere').create_note() | |
# Called once in the while | |
# Simple way to capture an image only once per minute | |
def periodic_capture(): | |
timeout = 60.0 # Sixty seconds | |
l = task.LoopingCall(upload_capture()) | |
l.start(timeout) # call every sixty seconds | |
reactor.run() | |
def detect_motion(camera): | |
stream | |
camera.capture(stream, format='jpeg', use_video_port=True) | |
stream.seek(0) | |
if previous_frame is None: | |
previous_frame = Image.open(stream) | |
return False | |
else: | |
current_frame = Image.open(stream) | |
result = motion(current_frame, previous_frame) | |
previous_frame = current_frame | |
return result | |
with picamera.PiCamera() as camera: | |
camera.resolution = (1920, 1080) | |
stream = picamera.PiCameraCircularIO(camera, seconds=10) | |
try: | |
while True: | |
camera.wait_recording(1) | |
periodic_capture() | |
if detect_motion(camera): | |
print('Motion detected!') | |
# As soon as we detect motion, split the recording to | |
# record the frames "after" motion | |
camera.split_recording(Note()) | |
# Write the 10 seconds "before" motion to disk as well | |
stream.copy_to(Note(), seconds=10) | |
stream.clear() | |
# Wait until motion is no longer detected, then split | |
# recording back to the in-memory circular buffer | |
while detect_motion(camera): | |
camera.wait_recording(1) | |
print('Motion stopped!') | |
camera.split_recording(stream) | |
finally: | |
camera.stop_recording() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
import requests | |
class Note(object): | |
def __init__(self, | |
host=None, | |
title='', | |
body='', | |
profile='', | |
frame=BytesIO(), | |
frames=BytesIO(), | |
user_email=None, | |
user_token=None): | |
self.profile = profile | |
self.host = host | |
self.title = title | |
self.body = body | |
self.frame = frame | |
self.frames = frames | |
self.user_email = user_email | |
self.user_token = user_token | |
# http://picamera.readthedocs.io/en/release-1.12/recipes2.html#custom-outputs | |
def write(self, stream): | |
self.frames += stream | |
def flush(self): | |
self.create_note() | |
def get_presigned_post(self): | |
get_presigned_post_url = self.host + | |
'/notes/new.json?user_email=' + self.user_email + | |
'&user_token=' + self.user_token | |
response = requests.get(get_presigned_post_url) | |
presigned_post_json = json.loads(response.text) | |
form_data = presigned_post_json['s3']['form-data'] | |
url = presigned_post_json['s3']['url'] | |
return url, form_data | |
def form_object_files(self): | |
file_data = self.frame.getvalue() | |
if(len(frame) > 0): | |
filename = 'test.jpg' | |
mime_type = 'image/jpeg' | |
else: | |
file_data = self.frames.getvalue() | |
filename = 'test.mp4' | |
mime_type = 'video/mp4' | |
{ 'file': (filename, file_data, mime_type) } | |
def post_file(self, url, form_data): | |
response = requests.post(url, files=self.form_object_files(), data=form_data) | |
url_re = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' | |
return re.findall(url_re, response.text)[0].split('<')[0] | |
def create_note(self, file_url=None, profile=''): | |
print("[INFO] getting presigned request...") | |
url, form_data = self.get_presigned_post() | |
print("[INFO] uploading file...") | |
file_url = self.post_file(url, form_data) | |
form_data = { | |
'note[title]': self.title, | |
'note[body]': self.body + ' ' + profile, | |
'note[file_url]': file_url, | |
'user_email': self.user_email, | |
'user_token': self.user_token | |
} | |
print("[INFO] creating note...") | |
response = requests.post(self.host + '/notes', data=form_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment