Skip to content

Instantly share code, notes, and snippets.

@mrpnelson
Created July 9, 2020 23:20
Show Gist options
  • Save mrpnelson/b2836d6d3855be013aed924db3ddd41d to your computer and use it in GitHub Desktop.
Save mrpnelson/b2836d6d3855be013aed924db3ddd41d to your computer and use it in GitHub Desktop.
DoorBirdSentry + Slack Integration
import copy
import hashlib
import io
import logging
import os
import pytz
import requests
import slack
import socket
import time
from chacha20poly1305 import ChaCha20Poly1305
from datetime import datetime
from io import BytesIO
from nacl import pwhash, secret, utils, bindings
from pathlib import Path
from PIL import Image
from requests.auth import HTTPDigestAuth
from threading import Lock, Thread
import sys
from pympler import asizeof
KEY_SIZE = 32
TESTMODE = False
TESTPOLLINTERVAL = 3600
IMAGEPOLLRATE = 2 # Seconds after which to poll for another image
RUNMODE = os.environ["RUNMODE"]
SLACK_TEAM_CHANNEL = os.environ["SLACK_TEAM_CHANNEL"]
SLACK_BOT = os.environ["SLACK_BOT"]
SLACK_API_TOKEN = os.environ["SLACK_API_TOKEN"]
PW_FIVE = os.environ["PW_FIRST_FIVE"]
DBRD_USER = os.environ["DBRD_USER"]
DBRD_PASS = os.environ["DBRD_PASS"]
DB_IMAGE_URL = os.environ["DB_IMAGE_URL"]
DB_ID=os.environ["DB_ID"]
LOGFILE = os.environ["LOGFILE"]
LOGGER = logging.getLogger('doorbird-sentry-logger')
LOGGER.setLevel(logging.INFO)
frames = [None] * 5
lock = Lock()
def main():
global frames
setup_logging()
LOGGER.info('----------------------------------------')
LOGGER.info('Starting up DoorBird sentry...')
LOGGER.info('Run mode: %s' % (RUNMODE))
LOGGER.info('Slack Bot: %s' % (SLACK_BOT))
LOGGER.info('Slack Channel: %s' % (SLACK_TEAM_CHANNEL))
LOGGER.info('----------------------------------------')
# Create a listener for events broadcase from the DoorBird device
# Necessarily requires script to be running on a system in the same
# network/broadcast domain.
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# DoorBird broadcasts events on two ports. For ease of tracking
# in dev/prd cases we'll use one port per environment.
if RUNMODE == "PRD":
client.bind(('<broadcast>', 6524))
elif RUNMODE == "DEV":
client.bind(('<broadcast>', 35344))
else:
LOGGER.info('[main] Need a runmode in .env to support port binding. Exiting.')
exit(1)
# Instantiate event tracking variables. These are event times.
lastevent = None
newevent = None
# Populate the frame buffer with a common frame to start with
frame_buffer_setup()
# Build the thread for periodic polling of images into a buffer
# We will create an animated gif only when we need it from the
# images stored in memory.
img_worker = Thread(target=image_buffer, args=())
img_worker.setDaemon(True)
img_worker.start()
# Start monitoring for event broadcasts from DoorBird
while True:
data, addr = client.recvfrom(1024)
if not data: break
# When we get data, we'll convert to a hex string
payload = data.hex()
# A doorbird event will always start with deadbe (per API docs)
if payload[0:6] == 'deadbe':
try:
# Process the payload to generate the new event and door event data.
# newevent is the timestamp for the event, and doorevent is the string
# that we'll send via Slack.
newevent, doorevent = doorbird_payload_processor(payload,lastevent)
except:
LOGGER.info('[main] Could not process what appears to be a valid payload.')
# To avoid possible duplicate events, confirm our current event
# isn't also our previous event.
if newevent != lastevent:
if doorevent is not False:
send_slack_im(doorevent)
# This event will become the last event on the next loop iteration
# and receipt of event for duplicate event checking.
lastevent = newevent
# Pull the most recent frame from the frame buffer and spawn the worker thread
# to provide a quick preview post to the Slack channel
quick_img_worker = Thread(target=post_last_frame, args=())
quick_img_worker.setDaemon(True)
quick_img_worker.start()
# Create the gif and then spawn the post worker thread
gifimage = create_gif_bytesIO()
if gifimage is not False: # Try to send a GIF, if we have one
LOGGER.debug('[main] Attempting to create and post a GIF')
gif_post_worker = Thread(target=post_bytes_to_channel, args=(gifimage, 'Bluff_DoorBird_Cam_Animated_GIF_%s.gif' % (time.strftime("%c"))))
gif_post_worker.setDaemon(True)
gif_post_worker.start()
else: # Send a still image if we can't send a GIF
LOGGER.error("[main] Unable to create a GIF, so sending a still image instead.")
quick_img_worker = Thread(target=post_last_frame, args=())
quick_img_worker.setDaemon(True)
quick_img_worker.start()
# Return a bytesIO object of the GIF after creating it.
def create_gif_bytesIO():
global frames
gif_bytesIO = BytesIO()
lock.acquire()
LOGGER.debug('[create_gif_bytesIO] ----- Frame buffer length before writing GIF: %i' % len(frames))
for i in range(len(frames)):
LOGGER.debug('[create_gif_bytesIO] Image list element %i dimensions and size: %s, %s' % (i,frames[i].size,asizeof.asizeof(frames[i])))
try:
frames[0].save(gif_bytesIO, format='GIF',
append_images=frames[1:],
save_all=True,
optimize=False,
duration=[500,500,500,500,3000], loop=0)
gif_created = True
LOGGER.info('[create_gif_bytesIO] Creating a GIF with size: %s KB' % round(asizeof.asizeof(gif_bytesIO)/1024,2))
except:
gif_created = False
LOGGER.error('[create_gif_bytesIO] Unable to build or write the animated gif.')
lock.release()
if gif_created is False:
return False
else:
return gif_bytesIO
# This function will trigger an image post to Slack of the last frame.
# Should be spawned as a thread.
def post_last_frame():
global frames
# Pull the most recent image from the frame buffer
lock.acquire()
image = copy.deepcopy(frames[4])
lock.release()
# Convert to a bytesIO object
imagedata = BytesIO()
image.save(imagedata, 'GIF')
# Post to the slack
try:
post_bytes_to_channel(imagedata, 'Bluff_DoorBird_Cam_-_Preview_Image.gif')
except:
LOGGER.error('[post_last_frame] Error posting the last frame to Slack')
return
# This function should be spawned as a thread. It will periodically poll the camera
# and update the frame buffer.
def image_buffer():
global frames
while True:
image = False
# The image buffer will grab a frame every two seconds from the camera.
time.sleep(IMAGEPOLLRATE)
# Acquire an image and store in memory
try:
image = db_image_to_mem()
except:
LOGGER.info('[image_buffer] Error grabbing a frame for the image_buffer...')
# If we have an image...
if image is not False:
# Process the image for optimization purposes
try:
frame = gif_frame_process(image)
except:
LOGGER.error("[image_buffer] Can't create a gif in gif_tester (was unable to build)")
else:
lock.acquire()
# Drop first entry in list
frames.pop(0)
# Add new entry at end of list
frames.append(frame)
lock.release()
else:
# If we don't have a new image, we'll try again after a longer pause
LOGGER.info('[image_buffer] Image frame grabber did not return an image. Backoff %s seconds and try again...' % (2*IMAGEPOLLRATE))
time.sleep(IMAGEPOLLRATE)
# Function to populate the frame buffer with a common image on startup (and potentially some exception cases)
def frame_buffer_setup():
global frames
base_frame_bytes = False
while base_frame_bytes is False:
base_frame_bytes = db_image_to_mem()
if base_frame_bytes is not False:
base_image = gif_frame_process(base_frame_bytes)
lock.acquire()
frames = [base_image] * 5
lock.release()
else:
time.sleep(1)
# Per Pillow docs, this open/save for acquired data helps to create more efficient GIFs.
def gif_frame_process(imagebytes):
try:
fobj = BytesIO()
frame = Image.open(imagebytes)
frame_resized = frame.resize((448, 336))
frame_resized.save(fobj, 'GIF', optimize=True)
frame = Image.open(fobj)
return frame
except:
return False
# Grab an image from the camera, and return it
def db_image_to_mem():
# Try to make a request to the DoorBird API Image URL to get the image.
try:
req = requests.get(DB_IMAGE_URL, auth=HTTPDigestAuth(DBRD_USER, DBRD_PASS))
except:
req = False
# Returns the BytesIO representation of the request response content
if req and req.status_code == 200:
return BytesIO(req.content)
else:
LOGGER.warning('[db_image_to_mem] API image grab error: Status code from API not 200, but was: %s' % req.status_code)
return False
# Handler for DoorBird events
def doorbird_payload_processor(payload,lastevent):
version = int(payload[6:8],16)
# Per API spec, all events should be version 1
if version == 1:
# Breakout payload into components per spec.
opslim = int(payload[8:16],16)
memlim = int(payload[16:24],16)
salt = payload[24:56]
nonce = payload[56:72]
ciphertext = payload[72:]
# The 'password' used to encrypt the data is actually the first five
# characters of the password of a user account that has valid permissions
# in the DoorBird app. A broadcast occurs for each user in the system,
# it appears, but only those with correct permissions can decrypt their
# respective packet.
password = PW_FIVE.encode()
salt = bytes.fromhex(salt)
nonce = bytes.fromhex(nonce)
ciphertext = bytes.fromhex(ciphertext)
derivedkey = pwhash.argon2i.kdf(KEY_SIZE, password, salt, opslimit=opslim, memlimit=memlim)
# Attempt to recover the plaintext for the event.
try:
plaintext = bindings.crypto_aead_chacha20poly1305_decrypt(ciphertext, None, nonce, derivedkey)
except:
LOGGER.info('[doorbird_payload_processor] Unable to decrypt provided payload - skipping it')
return(lastevent, False)
# We'll work with a hex string
plaintext = plaintext.hex()
# Get our intercom_ID
intercom_id = bytes.fromhex(plaintext[0:12]).decode()
# Checking here for an intercom ID match (not as necessary)
# with a single DoorBird though.
if intercom_id == DB_ID:
# Get the event type, which contains a bunch of padding per the spec (that we ingore/strip)
event = int(bytes.fromhex(plaintext[12:28]).decode().strip(),16)
if event == 1:
doorevent = ":bell: @here There is a vistor at the main door!\n\n:unlock: Unlock door if appropriate."
else:
doorevent = ":no_bell: The doorbell just registered an unknown event (%s)" % event
# Pull the timestamp from the event
timestamp = datetime.fromtimestamp(int(plaintext[28:36],16))
timestamp = pytz.timezone('America/Denver').localize(timestamp)
eventtime = timestamp.strftime("%H:%M:%S")
# A new event should have a different timestampt from the last event...
if eventtime != lastevent:
LOGGER.info('[doorbird_payload_processor] Unwrapped a new DoorBird event for handling')
return(eventtime, doorevent)
# ...but if it doesn't, we've probably acquired a duplicate somehow.
else:
LOGGER.info('[doorbird_payload_processor] Suppressing a duplicate DoorBird event')
return(lastevent, False)
else: # Shouldn't get here without a major API upgrade, but here just in case we need to catch this change one day.
LOGGER.warning('[doorbird_payload_processor] Received what appears to be a valid Doorbird packet, but the event version is unknown.')
return(lastevent, False)
def send_slack_im(payload, channel=SLACK_TEAM_CHANNEL, token=SLACK_API_TOKEN, botuser=SLACK_BOT):
slackclient = slack.WebClient(token, timeout=30)
slackclient.chat_postMessage(
channel=channel,
text=payload,
link_names=1,
username=botuser,
as_user='true',
)
def post_bytes_to_channel(filestream, title, channel=SLACK_TEAM_CHANNEL, token=SLACK_API_TOKEN):
gifhash = hashlib.md5(filestream.getvalue()).hexdigest()
LOGGER.debug('[post_bytes_to_channel] Image Hash: %s' % gifhash)
LOGGER.debug('[post_bytes_to_channel] Image Size: %sKB' % (round(asizeof.asizeof(filestream)/1024,2)))
slackclient = slack.WebClient(token, timeout=30)
filedata = filestream.getbuffer()
slackclient.files_upload(
channels=channel,
as_user='true',
filename=title,
file=filedata,
)
def setup_logging():
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
logdir = os.path.dirname(os.path.realpath(LOGFILE))
if not os.path.isdir(logdir):
try:
os.makedirs(logdir)
except:
print("[setup_logging] Error creating log directory %s" % logdir)
exit(1)
try:
open(LOGFILE, 'a').close()
except:
print('[setup_logging] Unable to access or create the log file in specified path (%s). Exiting' % LOGFILE)
exit(1)
# File logging
hdlr = logging.FileHandler(LOGFILE)
hdlr.setFormatter(formatter)
LOGGER.addHandler(hdlr)
# Console logging
console = logging.StreamHandler()
console.setFormatter(formatter)
LOGGER.addHandler(console)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment