Last active
February 27, 2017 15:14
-
-
Save bkbilly/a1b8b6b8768273969777b7e2db44f89e to your computer and use it in GitHub Desktop.
Flash lights for a specific time on the Yeelight when a Line-cross is detected from Hikvision camera.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import requests | |
import re | |
import threading | |
import time | |
import socket | |
import json | |
from datetime import datetime | |
import pytz | |
class LogFile: | |
''' Write messages to a log file ''' | |
def __init__(self, logfile, timezone=False, debug=False): | |
''' Initialize the LogFile class. | |
Optional arguments: | |
- timezone: The timezones as listed here `pytz.all_timezones` | |
- debug: Prints the message to the stdout | |
''' | |
self.logfile = logfile | |
self.mytimezone = False | |
self.debug = debug | |
if timezone is not False: | |
try: | |
self.mytimezone = pytz.timezone(timezone) | |
except: | |
self.writeLog("Timezone is not correct.") | |
self.mytimezone = False | |
def writeLog(self, message): | |
''' Write log events into a file. ''' | |
if self.mytimezone is False: | |
myTimeLog = datetime.now() | |
else: | |
myTimeLog = datetime.now(tz=self.mytimezone) | |
myTimeLog = myTimeLog.strftime("[%Y-%m-%d %H:%M:%S] ") | |
with open(self.logfile, "a") as myfile: | |
myfile.write(myTimeLog + message + "\n") | |
if self.debug is True: | |
print(myTimeLog + message) | |
def getLog(self, limit): | |
''' Returns the last n lines if the log file ''' | |
with open(self.logfile, "r") as f: | |
lines = f.readlines() | |
return lines[-limit:] | |
class Yeelight: | |
def __init__(self, bulb_ip, secAlert, secWait): | |
self.bulb_ip = bulb_ip | |
self.secAlert = secAlert | |
self.secWait = secWait | |
self.bulb_port = 55443 | |
self.ringing = False | |
self.log = LogFile('bulbHikvision.log', timezone='Europe/Athens', debug=True) | |
def ring(self): | |
self.log.writeLog("start ringing") | |
self.ringing = True | |
power, sceneParams = self.getState() | |
message = {"id": 1, "method": "set_scene", "params": ["cf", 0, 0, "200,1,16711680,20, 200,1,65280,20, 200,1,255,20"]} | |
self.connect(message, self.bulb_ip, self.bulb_port) | |
time.sleep(self.secAlert) | |
message = {"id": 1, "method": "set_scene", "params": sceneParams} | |
self.connect(message, self.bulb_ip, self.bulb_port) | |
if power == 'on': | |
self.turnOn() | |
elif power == 'off': | |
self.turnOff() | |
time.sleep(self.secWait) | |
self.ringing = False | |
self.log.writeLog("stop ringing") | |
def ringThread(self): | |
threading.Thread(target=self.ring).start() | |
def isRinging(self): | |
return self.ringing | |
def turnOff(self): | |
message = {"id": 1, "method": "set_power", "params": ["off", "smooth", 500]} | |
self.connect(message, self.bulb_ip, self.bulb_port) | |
def turnOn(self): | |
message = {"id": 1, "method": "set_power", "params": ["on", "smooth", 500]} | |
self.connect(message, self.bulb_ip, self.bulb_port) | |
def getState(self): | |
message = {"id": 1, "method": "get_prop", "params": ["power", "color_mode", "bright", "ct", "rgb"]} | |
state = self.connect(message, self.bulb_ip, self.bulb_port) | |
result = json.loads(state)["result"] | |
power = result[0] | |
sceneParams = [] | |
if result[1] == '1': | |
sceneParams = ["color", int(result[4]), int(result[2])] | |
elif result[1] == '2': | |
sceneParams = ["ct", int(result[3]), int(result[2])] | |
return power, sceneParams | |
def connect(self, command, bulb_ip, bulb_port): | |
try: | |
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
msg = json.dumps(command) + "\r\n" | |
tcp_socket.connect((bulb_ip, int(bulb_port))) | |
tcp_socket.send(msg) | |
data = tcp_socket.recv(1024) | |
tcp_socket.close() | |
return data | |
except Exception as e: | |
self.log.writeLog("Unexpected error:", e) | |
authorization = requests.auth.HTTPBasicAuth('username', 'password') | |
mylight = Yeelight("192.168.1.16", 1.5, 5) | |
try: | |
while True: | |
response = requests.get('http://192.168.1.15/ISAPI/Event/notification/alertStream', | |
auth=authorization, | |
stream=True) | |
for chunk in response.iter_lines(): | |
if chunk: | |
match = re.match(r'<eventType>(.*)</eventType>', chunk) | |
if match: | |
if match.group(1) == 'linedetection': | |
match.group(1) | |
if mylight.isRinging() is False: | |
mylight.ringThread() | |
except KeyboardInterrupt: | |
print('') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment