Last active
August 10, 2021 13:50
-
-
Save cassc/5cfa37e73b567e91154a5afe86768936 to your computer and use it in GitHub Desktop.
Control Ledou / keelight through TCP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Ledou LED Light / Keelight | |
# Search gateway and lights and show sample colors | |
import socket | |
import time | |
import traceback | |
import sys | |
from itertools import cycle | |
import binascii | |
PORT = 41330 | |
LED_SCAN_DEVICE_ID_START = 20 | |
def search_gateway(): | |
s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.bind(('',41328)) | |
while True: | |
m, addr =s.recvfrom(1024) | |
msg = m.hex() | |
print(f'Recv: {msg}') | |
if msg.startswith('f3c071'): | |
print(f'Found gateway: {addr}') | |
return addr[0] | |
def cmd_brightness(lightId, br=80, r=255, g=255, b=255): | |
val = int(br / 100 * 255) | |
msg = f'f2c2ffffffff00001d0c00000071{lightId}{val:02x}{r:02x}{g:02x}{b:02x}' | |
return bytes.fromhex(msg) | |
def cmd_off(): | |
return cmd_brightness(0) | |
def make_socket(ip, port=PORT): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.connect((ip, PORT)) | |
sock.settimeout(5) | |
return sock | |
ip = search_gateway() | |
lightIds = set() | |
def scan_light(sock, timeout=10): | |
cmd = bytes.fromhex('f3c77152bb0800001d050000007100') | |
sock.sendall(cmd) | |
# f3c77152bb08000020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 | |
data = sock.recv(1024) | |
lightsDataLen = data[9] | |
numLights = lightsDataLen // 5 | |
print(f"Found {numLights} lights") | |
offset = 10 | |
endOffset = 10 + lightsDataLen | |
while offset < endOffset: | |
lightId = data[offset:offset+4].hex() | |
lightIds.add(lightId) | |
print(f'Found light: {lightId}') | |
offset += 5 | |
# Sample brightness for testing | |
def make_sample_colors(lid): | |
return [ | |
cmd_brightness(lid, 100, 255, 0, 0), | |
cmd_brightness(lid, 80, 0, 255, 0), | |
cmd_brightness(lid, 60, 0, 0, 255), | |
cmd_brightness(lid, 40), | |
cmd_brightness(lid, 20), | |
cmd_brightness(lid, 00), | |
] | |
if not ip: | |
print(f'Gateway not found!') | |
sys.exit(0) | |
sock = make_socket(ip) | |
# searching for lights | |
start = time.time() | |
while time.time() - start < 5: | |
try: | |
scan_light(sock) | |
except socket.timeout as e: | |
pass | |
if len(lightIds) < 1: | |
print('No lights found!') | |
sys.exit(0) | |
BRS = [] | |
for lid in lightIds: | |
BRS += list(make_sample_colors(lid)) | |
for msg in cycle(BRS): | |
try: | |
sock.sendall(msg) | |
data = sock.recv(1024) | |
print("received message: %s" % data) | |
time.sleep(2) | |
except Exception as e: | |
if type(e) == socket.timeout: | |
print('connection lost, retry in 5 secs ...') | |
time.sleep(5) | |
sock = make_socket(ip) | |
else: | |
traceback.print_exc(file=sys.stdout) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment