Created
July 9, 2020 23:20
-
-
Save mrpnelson/b2836d6d3855be013aed924db3ddd41d to your computer and use it in GitHub Desktop.
DoorBirdSentry + Slack Integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import copy | |
import hashlib | |
import io | |
import logging | |
import os | |
import pytz | |
import requests | |
import slack | |
import socket | |
import time | |
from chacha20poly1305 import ChaCha20Poly1305 | |
from datetime import datetime | |
from io import BytesIO | |
from nacl import pwhash, secret, utils, bindings | |
from pathlib import Path | |
from PIL import Image | |
from requests.auth import HTTPDigestAuth | |
from threading import Lock, Thread | |
import sys | |
from pympler import asizeof | |
KEY_SIZE = 32 | |
TESTMODE = False | |
TESTPOLLINTERVAL = 3600 | |
IMAGEPOLLRATE = 2 # Seconds after which to poll for another image | |
RUNMODE = os.environ["RUNMODE"] | |
SLACK_TEAM_CHANNEL = os.environ["SLACK_TEAM_CHANNEL"] | |
SLACK_BOT = os.environ["SLACK_BOT"] | |
SLACK_API_TOKEN = os.environ["SLACK_API_TOKEN"] | |
PW_FIVE = os.environ["PW_FIRST_FIVE"] | |
DBRD_USER = os.environ["DBRD_USER"] | |
DBRD_PASS = os.environ["DBRD_PASS"] | |
DB_IMAGE_URL = os.environ["DB_IMAGE_URL"] | |
DB_ID=os.environ["DB_ID"] | |
LOGFILE = os.environ["LOGFILE"] | |
LOGGER = logging.getLogger('doorbird-sentry-logger') | |
LOGGER.setLevel(logging.INFO) | |
frames = [None] * 5 | |
lock = Lock() | |
def main(): | |
global frames | |
setup_logging() | |
LOGGER.info('----------------------------------------') | |
LOGGER.info('Starting up DoorBird sentry...') | |
LOGGER.info('Run mode: %s' % (RUNMODE)) | |
LOGGER.info('Slack Bot: %s' % (SLACK_BOT)) | |
LOGGER.info('Slack Channel: %s' % (SLACK_TEAM_CHANNEL)) | |
LOGGER.info('----------------------------------------') | |
# Create a listener for events broadcase from the DoorBird device | |
# Necessarily requires script to be running on a system in the same | |
# network/broadcast domain. | |
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
# DoorBird broadcasts events on two ports. For ease of tracking | |
# in dev/prd cases we'll use one port per environment. | |
if RUNMODE == "PRD": | |
client.bind(('<broadcast>', 6524)) | |
elif RUNMODE == "DEV": | |
client.bind(('<broadcast>', 35344)) | |
else: | |
LOGGER.info('[main] Need a runmode in .env to support port binding. Exiting.') | |
exit(1) | |
# Instantiate event tracking variables. These are event times. | |
lastevent = None | |
newevent = None | |
# Populate the frame buffer with a common frame to start with | |
frame_buffer_setup() | |
# Build the thread for periodic polling of images into a buffer | |
# We will create an animated gif only when we need it from the | |
# images stored in memory. | |
img_worker = Thread(target=image_buffer, args=()) | |
img_worker.setDaemon(True) | |
img_worker.start() | |
# Start monitoring for event broadcasts from DoorBird | |
while True: | |
data, addr = client.recvfrom(1024) | |
if not data: break | |
# When we get data, we'll convert to a hex string | |
payload = data.hex() | |
# A doorbird event will always start with deadbe (per API docs) | |
if payload[0:6] == 'deadbe': | |
try: | |
# Process the payload to generate the new event and door event data. | |
# newevent is the timestamp for the event, and doorevent is the string | |
# that we'll send via Slack. | |
newevent, doorevent = doorbird_payload_processor(payload,lastevent) | |
except: | |
LOGGER.info('[main] Could not process what appears to be a valid payload.') | |
# To avoid possible duplicate events, confirm our current event | |
# isn't also our previous event. | |
if newevent != lastevent: | |
if doorevent is not False: | |
send_slack_im(doorevent) | |
# This event will become the last event on the next loop iteration | |
# and receipt of event for duplicate event checking. | |
lastevent = newevent | |
# Pull the most recent frame from the frame buffer and spawn the worker thread | |
# to provide a quick preview post to the Slack channel | |
quick_img_worker = Thread(target=post_last_frame, args=()) | |
quick_img_worker.setDaemon(True) | |
quick_img_worker.start() | |
# Create the gif and then spawn the post worker thread | |
gifimage = create_gif_bytesIO() | |
if gifimage is not False: # Try to send a GIF, if we have one | |
LOGGER.debug('[main] Attempting to create and post a GIF') | |
gif_post_worker = Thread(target=post_bytes_to_channel, args=(gifimage, 'Bluff_DoorBird_Cam_Animated_GIF_%s.gif' % (time.strftime("%c")))) | |
gif_post_worker.setDaemon(True) | |
gif_post_worker.start() | |
else: # Send a still image if we can't send a GIF | |
LOGGER.error("[main] Unable to create a GIF, so sending a still image instead.") | |
quick_img_worker = Thread(target=post_last_frame, args=()) | |
quick_img_worker.setDaemon(True) | |
quick_img_worker.start() | |
# Return a bytesIO object of the GIF after creating it. | |
def create_gif_bytesIO(): | |
global frames | |
gif_bytesIO = BytesIO() | |
lock.acquire() | |
LOGGER.debug('[create_gif_bytesIO] ----- Frame buffer length before writing GIF: %i' % len(frames)) | |
for i in range(len(frames)): | |
LOGGER.debug('[create_gif_bytesIO] Image list element %i dimensions and size: %s, %s' % (i,frames[i].size,asizeof.asizeof(frames[i]))) | |
try: | |
frames[0].save(gif_bytesIO, format='GIF', | |
append_images=frames[1:], | |
save_all=True, | |
optimize=False, | |
duration=[500,500,500,500,3000], loop=0) | |
gif_created = True | |
LOGGER.info('[create_gif_bytesIO] Creating a GIF with size: %s KB' % round(asizeof.asizeof(gif_bytesIO)/1024,2)) | |
except: | |
gif_created = False | |
LOGGER.error('[create_gif_bytesIO] Unable to build or write the animated gif.') | |
lock.release() | |
if gif_created is False: | |
return False | |
else: | |
return gif_bytesIO | |
# This function will trigger an image post to Slack of the last frame. | |
# Should be spawned as a thread. | |
def post_last_frame(): | |
global frames | |
# Pull the most recent image from the frame buffer | |
lock.acquire() | |
image = copy.deepcopy(frames[4]) | |
lock.release() | |
# Convert to a bytesIO object | |
imagedata = BytesIO() | |
image.save(imagedata, 'GIF') | |
# Post to the slack | |
try: | |
post_bytes_to_channel(imagedata, 'Bluff_DoorBird_Cam_-_Preview_Image.gif') | |
except: | |
LOGGER.error('[post_last_frame] Error posting the last frame to Slack') | |
return | |
# This function should be spawned as a thread. It will periodically poll the camera | |
# and update the frame buffer. | |
def image_buffer(): | |
global frames | |
while True: | |
image = False | |
# The image buffer will grab a frame every two seconds from the camera. | |
time.sleep(IMAGEPOLLRATE) | |
# Acquire an image and store in memory | |
try: | |
image = db_image_to_mem() | |
except: | |
LOGGER.info('[image_buffer] Error grabbing a frame for the image_buffer...') | |
# If we have an image... | |
if image is not False: | |
# Process the image for optimization purposes | |
try: | |
frame = gif_frame_process(image) | |
except: | |
LOGGER.error("[image_buffer] Can't create a gif in gif_tester (was unable to build)") | |
else: | |
lock.acquire() | |
# Drop first entry in list | |
frames.pop(0) | |
# Add new entry at end of list | |
frames.append(frame) | |
lock.release() | |
else: | |
# If we don't have a new image, we'll try again after a longer pause | |
LOGGER.info('[image_buffer] Image frame grabber did not return an image. Backoff %s seconds and try again...' % (2*IMAGEPOLLRATE)) | |
time.sleep(IMAGEPOLLRATE) | |
# Function to populate the frame buffer with a common image on startup (and potentially some exception cases) | |
def frame_buffer_setup(): | |
global frames | |
base_frame_bytes = False | |
while base_frame_bytes is False: | |
base_frame_bytes = db_image_to_mem() | |
if base_frame_bytes is not False: | |
base_image = gif_frame_process(base_frame_bytes) | |
lock.acquire() | |
frames = [base_image] * 5 | |
lock.release() | |
else: | |
time.sleep(1) | |
# Per Pillow docs, this open/save for acquired data helps to create more efficient GIFs. | |
def gif_frame_process(imagebytes): | |
try: | |
fobj = BytesIO() | |
frame = Image.open(imagebytes) | |
frame_resized = frame.resize((448, 336)) | |
frame_resized.save(fobj, 'GIF', optimize=True) | |
frame = Image.open(fobj) | |
return frame | |
except: | |
return False | |
# Grab an image from the camera, and return it | |
def db_image_to_mem(): | |
# Try to make a request to the DoorBird API Image URL to get the image. | |
try: | |
req = requests.get(DB_IMAGE_URL, auth=HTTPDigestAuth(DBRD_USER, DBRD_PASS)) | |
except: | |
req = False | |
# Returns the BytesIO representation of the request response content | |
if req and req.status_code == 200: | |
return BytesIO(req.content) | |
else: | |
LOGGER.warning('[db_image_to_mem] API image grab error: Status code from API not 200, but was: %s' % req.status_code) | |
return False | |
# Handler for DoorBird events | |
def doorbird_payload_processor(payload,lastevent): | |
version = int(payload[6:8],16) | |
# Per API spec, all events should be version 1 | |
if version == 1: | |
# Breakout payload into components per spec. | |
opslim = int(payload[8:16],16) | |
memlim = int(payload[16:24],16) | |
salt = payload[24:56] | |
nonce = payload[56:72] | |
ciphertext = payload[72:] | |
# The 'password' used to encrypt the data is actually the first five | |
# characters of the password of a user account that has valid permissions | |
# in the DoorBird app. A broadcast occurs for each user in the system, | |
# it appears, but only those with correct permissions can decrypt their | |
# respective packet. | |
password = PW_FIVE.encode() | |
salt = bytes.fromhex(salt) | |
nonce = bytes.fromhex(nonce) | |
ciphertext = bytes.fromhex(ciphertext) | |
derivedkey = pwhash.argon2i.kdf(KEY_SIZE, password, salt, opslimit=opslim, memlimit=memlim) | |
# Attempt to recover the plaintext for the event. | |
try: | |
plaintext = bindings.crypto_aead_chacha20poly1305_decrypt(ciphertext, None, nonce, derivedkey) | |
except: | |
LOGGER.info('[doorbird_payload_processor] Unable to decrypt provided payload - skipping it') | |
return(lastevent, False) | |
# We'll work with a hex string | |
plaintext = plaintext.hex() | |
# Get our intercom_ID | |
intercom_id = bytes.fromhex(plaintext[0:12]).decode() | |
# Checking here for an intercom ID match (not as necessary) | |
# with a single DoorBird though. | |
if intercom_id == DB_ID: | |
# Get the event type, which contains a bunch of padding per the spec (that we ingore/strip) | |
event = int(bytes.fromhex(plaintext[12:28]).decode().strip(),16) | |
if event == 1: | |
doorevent = ":bell: @here There is a vistor at the main door!\n\n:unlock: Unlock door if appropriate." | |
else: | |
doorevent = ":no_bell: The doorbell just registered an unknown event (%s)" % event | |
# Pull the timestamp from the event | |
timestamp = datetime.fromtimestamp(int(plaintext[28:36],16)) | |
timestamp = pytz.timezone('America/Denver').localize(timestamp) | |
eventtime = timestamp.strftime("%H:%M:%S") | |
# A new event should have a different timestampt from the last event... | |
if eventtime != lastevent: | |
LOGGER.info('[doorbird_payload_processor] Unwrapped a new DoorBird event for handling') | |
return(eventtime, doorevent) | |
# ...but if it doesn't, we've probably acquired a duplicate somehow. | |
else: | |
LOGGER.info('[doorbird_payload_processor] Suppressing a duplicate DoorBird event') | |
return(lastevent, False) | |
else: # Shouldn't get here without a major API upgrade, but here just in case we need to catch this change one day. | |
LOGGER.warning('[doorbird_payload_processor] Received what appears to be a valid Doorbird packet, but the event version is unknown.') | |
return(lastevent, False) | |
def send_slack_im(payload, channel=SLACK_TEAM_CHANNEL, token=SLACK_API_TOKEN, botuser=SLACK_BOT): | |
slackclient = slack.WebClient(token, timeout=30) | |
slackclient.chat_postMessage( | |
channel=channel, | |
text=payload, | |
link_names=1, | |
username=botuser, | |
as_user='true', | |
) | |
def post_bytes_to_channel(filestream, title, channel=SLACK_TEAM_CHANNEL, token=SLACK_API_TOKEN): | |
gifhash = hashlib.md5(filestream.getvalue()).hexdigest() | |
LOGGER.debug('[post_bytes_to_channel] Image Hash: %s' % gifhash) | |
LOGGER.debug('[post_bytes_to_channel] Image Size: %sKB' % (round(asizeof.asizeof(filestream)/1024,2))) | |
slackclient = slack.WebClient(token, timeout=30) | |
filedata = filestream.getbuffer() | |
slackclient.files_upload( | |
channels=channel, | |
as_user='true', | |
filename=title, | |
file=filedata, | |
) | |
def setup_logging(): | |
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |
logdir = os.path.dirname(os.path.realpath(LOGFILE)) | |
if not os.path.isdir(logdir): | |
try: | |
os.makedirs(logdir) | |
except: | |
print("[setup_logging] Error creating log directory %s" % logdir) | |
exit(1) | |
try: | |
open(LOGFILE, 'a').close() | |
except: | |
print('[setup_logging] Unable to access or create the log file in specified path (%s). Exiting' % LOGFILE) | |
exit(1) | |
# File logging | |
hdlr = logging.FileHandler(LOGFILE) | |
hdlr.setFormatter(formatter) | |
LOGGER.addHandler(hdlr) | |
# Console logging | |
console = logging.StreamHandler() | |
console.setFormatter(formatter) | |
LOGGER.addHandler(console) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment