Last active
September 25, 2019 13:46
-
-
Save Graph-X/30751b191e23e3e961b952c8b48b1f21 to your computer and use it in GitHub Desktop.
Philips HueyLewis And The Red Green and Blues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
####################################################################### | |
# Philips Hue Automation Script # | |
# Uses DHCP log forwarding from DHCP server to figure out if the # | |
# device is on network (home) or off (away). Also uses a hold down # | |
# timer to pervent the lights being triggered every fucking time # | |
# the phone gets a new DHCP lease. Like always, I'm putting this # | |
# shit code out there for public consumption and ridicule under # | |
# the YPL. Maybe you'll find it useful or maybe not. no matter. # | |
# zero fux given # | |
####################################################################### | |
""" | |
#YOLO Public License (YPL) v0.12.34-hunter.2 | |
This software is provided as is and free of charge. It can be redesigned, redistributed, | |
refrigerated, remade, reheated, and regifted in part or in whole by any person, corporation, rodent, or wet floor sign | |
for fun or profit or hookers and blow. Marsupials, and all other inanimate objects are prohibited from using this software. | |
In exchange, just give me credit for inspiring you to steal my code like Carlos Mencia steals jokes. I steal a bunch too so you're | |
probably just getting sloppy seconds anyways. Shout out to Stack Exchange/Overflow for giving me help via shitty code snippets whenever | |
I got stuck. | |
Keep in mind I'm not a dev and can barely write good English let alone good code. This software is likely buggy as hell | |
and is provided AS IS with no warranty real, imagined, fabricated, fornicated or pulled from a magic hat that this software is | |
suitable for any purpose, porpise, or tortise, unless it's also a florist. To be honest, you probably should not even use this in any | |
environment you want to have working right. If by some miracle this code works and you see me at a con then you owe me beer or some kind | |
of booze except for smirnoff ice because that shit is fucking gross. | |
In short, you can't sue me if you decide to use this code that I'm putting out there for free and shit goes terribly wrong. I'm talking | |
deleting your root with no backups type of bad shit happenning. I'm just as broke as this code and you'll just be pissing into the wind | |
on that endeavor. I already warned you that my code is bad. Read through it and make sure you know what it does, or have your cousin | |
that took a web design class in high school help you figure out what it does. | |
...Or just take my word for it and wing it. #YOLO | |
8=====D | |
""" | |
from watchdog.observers import Observer | |
from watchdog.events import PatternMatchingEventHandler | |
from watchdog.events import FileSystemEventHandler | |
import logging | |
import time | |
import pyping | |
import pyhue | |
from datetime import datetime | |
#two hour hold down. Timer | |
MAX_TIMEOUT=7200 | |
""" | |
MAC_ADDRESSES is a dictionary with the mac address as the key. | |
The value is an array of boolean, float or None, and IP address. | |
This is used to track activity and helps determine when the phone has been | |
off network for more than the MAX_TIMEOUT | |
""" | |
### CUSTOMIZABLE VARIABLES ### | |
# {mac address:[status,status change time,IP address]} | |
MAC_ADDRESSES = { | |
"ab:cd:ef:00:01:02":[False,None,"0.0.0.0"], | |
"ab:cd:ef:00:01:02":[False,None,"0.0.0.0"], | |
"ab:cd:ef:00:01:02":[False,None,"0.0.0.0"] | |
} | |
#Watch this folder for file changes. We are only concerned with the LOGFILE file though. | |
PATH = "/var/log" | |
LOGFILE = "pfsense.log" | |
#Location of the log file for this script. | |
HUEYLOG = "/var/log/watcher.log" | |
#Hue Bridge creds. | |
#todo: Change the hostname of the hue to | |
BRIDGE = "127.0.0.1" | |
BUSER = "ENTERYOURUSERTOKENHERE" | |
### END OF CUSTOMIZABLE VARIABLES ### | |
#logging function. Now we can be alerted when our shitty code falls on its face | |
def Logger(): | |
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(name)s %(levelname)s %(message)s') | |
logger = logging.getLogger(__name__) | |
logging.getLogger("watchdog").setLevel(logging.INFO) | |
handler = logging.FileHandler(HUEYLOG) | |
logger.addHandler(handler) | |
return logger | |
#Main function. Only here to call the watcher | |
def main(): | |
watcher = Watcher() | |
watcher.run() | |
#turn lights on or off depending on the boolean value of val | |
def turn(val): | |
if val: | |
logger.debug("Turning the lights on!") | |
else: | |
logger.debug("Turning the lights off!") | |
b = pyhue.Bridge(BRIDGE,BUSER) | |
b.groups[3].on = val | |
b.groups[1].on = val | |
#watcher class to look for changes to the log file | |
class Watcher(): | |
def __init__(self): | |
logger.info("Initiating Watcher...") | |
self.observer = Observer() | |
self.t = 0 | |
def run(self): | |
new_dict = {} | |
global MAC_ADDRESSES | |
f = open(PATH + "/" + LOGFILE) | |
mac_addy = MAC_ADDRESSES | |
for line in reversed(f.readlines()): | |
logger.debug("We have {0} devices to reconcile".format(str(len(mac_addy)))) | |
if len(mac_addy) != 0: | |
if "DHCPOFFER" in line: | |
for k in mac_addy.keys(): | |
if k in line: | |
new_dict[k] = [False,None,""] | |
logger.debug("Found IP for {0} ==> {1}".format(k,line.split(' ')[7])) | |
new_dict[k][2] = line.split(' ')[7] | |
if mac_addy[k][1] == None: | |
r = pyping.ping(new_dict[k][2]) | |
if r.ret_code == 0: | |
new_dict[k][1] = time.time() | |
new_dict[k][0] = True | |
else: | |
#Set time to some time that is ridiculously far in the past | |
new_dict[k][1] = 1.0000 | |
new_dict[k][0] = False | |
#pop the device off the dict so we don't duplicate assignment | |
mac_addy.pop(k) | |
else: | |
logger.debug("All devices have been assigned last known good IP {}".format(str(new_dict))) | |
# set MAC_ADDRESSES to the new values | |
MAC_ADDRESSES = new_dict | |
#This can be removed once debugging is done | |
logger.debug(str(MAC_ADDRESSES)) | |
#for macs in MAC_ADDRESSES: | |
#logger.debug()(macs + " ==> " + str(MAC_ADDRESSES[macs])) | |
#logger.debug("all devices accounted for") | |
break | |
logger.debug("We are at the last line of the log file.") | |
#instantiate the handler, kick off the observer and loop forever | |
event_handler = Handler(f) | |
self.observer.schedule(event_handler,PATH,recursive = False) | |
self.observer.start() | |
try: | |
while True: | |
time.sleep(1) | |
self.t += 1 | |
if self.t == MAX_TIMEOUT: | |
logger.debug("two hour checkin.") | |
self.t = 0 | |
#check last log time for each device. | |
for mac in MAC_ADDRESSES: | |
logger.debug("Checking Device with MAC: {0}".format(mac)) | |
info = MAC_ADDRESSES[mac] | |
if info[1] is not None: | |
#if device hasn't checked in 2 hours | |
if (info[1] - time.time()) > MAX_TIMEOUT: | |
logger.debug("mac: {0} info: {1}".format(str(mac), str(info))) | |
n = time.time() | |
logger.debug("the difference is: {0}".format(str(n - info[1]))) | |
logger.debug("Attempting to ping: {}".format(info[2])) | |
r = pyping.ping(info[2]) | |
if r.ret_code == 1: | |
#no response received, are we listed as home? | |
if MAC_ADDRESSES[mac][0] == True: | |
logger.debug("Ping timeout!") | |
MAC_ADDRESSES[mac][0] = False | |
turn(MAC_ADDRESSES[mac][0]) | |
else: | |
logger.debug("No need to constantly turn the lights off here.") | |
else: | |
logger.debug("Ping response received. Setting new time ({0}) and home (True) values".format(str(time.time()))) | |
MAC_ADDRESSES[mac][0] = True | |
MAC_ADDRESSES[mac][1] = time.time() | |
except Exception as e: | |
self.observer.stop() | |
logger.error("Error: {}".format(str(e))) | |
exit | |
except KeyboardInterrupt(): | |
self.observer.stop() | |
exit | |
class Handler(FileSystemEventHandler): | |
def __init__(self,f): | |
self.f = f | |
def on_modified(self,event): | |
f = self.f | |
if event.src_path.endswith(LOGFILE): | |
global MAC_ADDRESSES | |
line = f.readline() | |
logger.debug("current log file line: {}".format(str(line))) | |
if line.split(' ')[5] == "DHCPDISCOVER": | |
mac = line.split(' ')[7] | |
now = time.time() | |
if MAC_ADDRESSES[mac][0] == False: | |
logger.debug('new DHCPDISCOVER!') | |
logger.debug("The time is now: {}".format(str(now))) | |
#if it's been more than an hour since the phone has made a DHCPDISCOVER request | |
if MAC_ADDRESSES[mac][1] is not None: | |
diff = now - MAC_ADDRESSES[mac][1] | |
logger.debug("It has been {} seconds since the last DHCPDISCOVER request for this mac".format(diff)) | |
if (now - MAC_ADDRESSES[mac][1]) > MAX_TIMEOUT: | |
MAC_ADDRESSES[mac][0] = True | |
MAC_ADDRESSES[mac][1] = now | |
turn(MAC_ADDRESSES[mac][0]) | |
else: | |
logger.debug('device wasn\'t gone long enough. Must wait an hour between notifications') | |
else: | |
logger.debug("{0} has been seen for the first time. Turning lights on.".format(mac)) | |
MAC_ADDRESSES[mac][1] = now | |
MAC_ADDRESSES[mac][0] = True | |
turn(MAC_ADDRESSES[mac][0]) | |
else: | |
if (now - MAC_ADDRESSES[mac][1]) > MAX_TIMEOUT: | |
logger.debug("Device was offline for longer than {0}. Turning lights on.".format(str(MAX_TIMEOUT))) | |
MAC_ADDRESSES[mac][0] = True | |
turn(MAC_ADDRESSES[mac][0]) | |
MAC_ADDRESSES[mac][1] = now | |
if line.split(' ')[5] == "DHCPOFFER": | |
logger.debug('DHCPOFFER!') | |
mac = line.split(' ')[9] | |
ip = line.split(' ')[7] | |
if MAC_ADDRESSES[mac][2] != ip: | |
logger.debug("Updating IP address for {0} to {1}".format(mac,ip)) | |
MAC_ADDRESSES[mac][2] = ip | |
if __name__ == "__main__": | |
logger = Logger() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment