Skip to content

Instantly share code, notes, and snippets.

@softberries
Created June 9, 2017 12:57
Show Gist options
  • Save softberries/c573587cdd7c9db31fc3d03ea393932d to your computer and use it in GitHub Desktop.
Save softberries/c573587cdd7c9db31fc3d03ea393932d to your computer and use it in GitHub Desktop.
Access Control System with Raspberry Pi, RFiD and AWS Rekogintion
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import sys
import logging
import time
import getopt
from datetime import datetime
import picamera
import os
import tinys3
import json
# Usage
usageInfo = """Usage:
Use certificate based mutual authentication:
python rpi_rfid_rekognition.py -e <endpoint> -r <rootCAFilePath> -c <certFilePath> -k <privateKeyFilePath>
Type "python rpi_rfid_rekognition.py -h" for available options.
"""
# Help info
helpInfo = """-e, --endpoint
Your AWS IoT custom endpoint
-r, --rootCA
Root CA file path
-c, --cert
Certificate file path
-k, --key
Private key file path
-h, --help
Help information
"""
# Read in command-line parameters
host = ""
rootCAPath = ""
certificatePath = ""
privateKeyPath = ""
try:
opts, args = getopt.getopt(sys.argv[1:], "hwe:k:c:r:", ["help", "endpoint=", "key=","cert=","rootCA="])
if len(opts) == 0:
raise getopt.GetoptError("No input parameters!")
for opt, arg in opts:
if opt in ("-h", "--help"):
print(helpInfo)
exit(0)
if opt in ("-e", "--endpoint"):
host = arg
if opt in ("-r", "--rootCA"):
rootCAPath = arg
if opt in ("-c", "--cert"):
certificatePath = arg
if opt in ("-k", "--key"):
privateKeyPath = arg
except getopt.GetoptError:
print(usageInfo)
exit(1)
# Missing configuration notification
missingConfiguration = False
if not host:
print("Missing '-e' or '--endpoint'")
missingConfiguration = True
if not rootCAPath:
print("Missing '-r' or '--rootCA'")
missingConfiguration = True
if not certificatePath:
print("Missing '-c' or '--cert'")
missingConfiguration = True
if not privateKeyPath:
print("Missing '-k' or '--key'")
missingConfiguration = True
if missingConfiguration:
exit(2)
# photo properties
image_width = 800
image_height = 600
file_extension = '.png'
# AWS S3 properties
access_key_id = ''
secret_access_key = ''
bucket_name = 'your-bucket'
# RFID character map for hid device
hid = { 4: 'a', 5: 'b', 6: 'c', 7: 'd', 8: 'e', 9: 'f', 10: 'g', 11: 'h', 12: 'i', 13: 'j', 14: 'k', 15: 'l', 16: 'm', 17: 'n', 18: 'o', 19: 'p', 20: 'q', 21: 'r', 22: 's', 23: 't', 24: 'u', 25: 'v', 26: 'w', 27: 'x', 28: 'y', 29: 'z', 30: '1', 31: '2', 32: '3', 33: '4', 34: '5', 35: '6', 36: '7', 37: '8', 38: '9', 39: '0', 44: ' ', 45: '-', 46: '=', 47: '[', 48: ']', 49: '\\', 51: ';' , 52: '\'', 53: '~', 54: ',', 55: '.', 56: '/' }
hid2 = { 4: 'A', 5: 'B', 6: 'C', 7: 'D', 8: 'E', 9: 'F', 10: 'G', 11: 'H', 12: 'I', 13: 'J', 14: 'K', 15: 'L', 16: 'M', 17: 'N', 18: 'O', 19: 'P', 20: 'Q', 21: 'R', 22: 'S', 23: 'T', 24: 'U', 25: 'V', 26: 'W', 27: 'X', 28: 'Y', 29: 'Z', 30: '!', 31: '@', 32: '#', 33: '$', 34: '%', 35: '^', 36: '&', 37: '*', 38: '(', 39: ')', 44: ' ', 45: '_', 46: '+', 47: '{', 48: '}', 49: '|', 51: ':' , 52: '"', 53: '~', 54: '<', 55: '>', 56: '?' }
# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
myAWSIoTMQTTClient = AWSIoTMQTTClient("basicPubSub")
myAWSIoTMQTTClient.configureEndpoint(host, 8883)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec
# camera setup
camera = picamera.PiCamera()
camera.resolution = (image_width, image_height)
camera.awb_mode = 'auto'
# Start listening on RFID events
fp = open('/dev/hidraw0', 'rb')
def waitForRFIDScan():
ss = ""
shift = False
done = False
while not done:
buffer = fp.read(8)
for c in buffer:
if ord(c) > 0:
if int(ord(c)) == 40:
done = True
break;
if shift:
if int(ord(c)) == 2 :
shift = True
else:
ss += hid2[ int(ord(c)) ]
shift = False
else:
if int(ord(c)) == 2 :
shift = True
else:
ss += hid[ int(ord(c)) ]
return ss
def uploadToS3(file_name):
filepath = file_name + file_extension
camera.capture(filepath)
conn = tinys3.Connection(access_key_id, secret_access_key)
f = open(filepath, 'rb')
conn.upload(filepath, f, bucket_name,
headers={
'x-amz-meta-cache-control': 'max-age=60'
})
if os.path.exists(filepath):
os.remove(filepath)
# Custom MQTT message callback
def photoVerificationCallback(client, userdata, message):
print("Received a new message: ")
data = json.loads(message.payload)
try:
similarity = data[1][0]['Similarity']
print("Received similarity: " + str(similarity))
if(similarity >= 90):
print("Access allowed, opening doors.")
print("Thank you!")
except:
pass
print("Finished processing event.")
def checkRFIDNumber(rfidnumber):
return rfidnumber == '0004098554'
# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
myAWSIoTMQTTClient.subscribe("rekognition/result", 1, photoVerificationCallback)
time.sleep(2)
# Publish to the same topic in a loop forever
while True:
print("waiting..")
scan = waitForRFIDScan()
print(scan)
if(checkRFIDNumber(scan)):
print("RFID correct, taking photo...")
uploadToS3(scan)
else:
print("Bad RFID - Access Denied")
@Pratik072
Copy link

it would be kind enough if u elaborate where what parameters go as the script throws error for uploadToS3
imag0523

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment