Created
October 17, 2019 11:55
-
-
Save kurain/6deed37300b2ac2c878afdfa7596894a to your computer and use it in GitHub Desktop.
detecting thumbturn status
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import picamera | |
import sys | |
from time import sleep | |
from tensorflow.contrib import predictor | |
from picamera import PiCamera | |
import signal | |
import slacker | |
DOOR_UNLOCKED = 0 | |
DOOR_LOCKED = 1 | |
SLACK_TOKEN='YOUR_TOKEN_XXXXXXXXXXXXXXXXXXXXXXX' | |
slack = slacker.Slacker(SLACK_TOKEN) | |
def send_message_and_upload(img_data, door_status, score): | |
if door_status == DOOR_LOCKED: | |
message = f'The door is LOCKED {score}' | |
else: | |
message = f'The door is UNLOCKED {score}' | |
slack.files.upload(img_data,channels=['general'],initial_comment=message) | |
def detect_door(img, predict_fn): | |
predictions = predict_fn( | |
{ | |
"key": ["0"], | |
"image_bytes": [img] | |
}) | |
box = predictions['detection_boxes'][0][0] | |
top_score = predictions['detection_scores'][0][0] | |
top_class = predictions['detection_classes_as_text'][0][0] | |
if top_class == b'closed_thumbturn' and top_score > 0.5: | |
return (DOOR_LOCKED, top_class, top_score, box) | |
else: | |
return (DOOR_UNLOCKED, top_class, top_score, box) | |
force_notify = False | |
def sig_handler(signum, frame): | |
print('RECEIVE SIGHUP') | |
global force_notify | |
force_notify = True | |
signal.signal(signal.SIGHUP, sig_handler) | |
def main(): | |
model_dir = sys.argv[1] | |
predict_fn = predictor.from_saved_model(model_dir) | |
print('----- loaded ---') | |
camera = PiCamera() | |
camera.resolution = (640, 480) | |
sleep(2) # warm up | |
print('----- camera ready ---') | |
prev_status = DOOR_UNLOCKED | |
global force_notify | |
while True: | |
stream = io.BytesIO() | |
camera.capture(stream, 'jpeg') | |
stream.seek(0) | |
img = stream.read() | |
door_status, top_class, top_score, box = detect_door(img, predict_fn) | |
if (prev_status != door_status) or force_notify: | |
send_message_and_upload(img, door_status, score) | |
print(','.join([str(e) for e in ['STATUS', door_status, top_class, top_score] + list(box)])) | |
force_notify = False | |
prev_status = door_status | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment