|
import cec |
|
import re |
|
import time |
|
import os |
|
import copy |
|
from collections import defaultdict |
|
import requests |
|
|
|
# Build opcode maps |
|
OPCODE_MAP = { |
|
v: k for k, v in vars(cec).items() |
|
if k.startswith("CEC_OPCODE_") and isinstance(v, int) |
|
} |
|
REVERSE_OPCODE_MAP = { |
|
k.replace("CEC_OPCODE_", ""): v for v, k in OPCODE_MAP.items() |
|
} |
|
|
|
LOG_LINE_REGEX = re.compile( |
|
r"CEC: ([0-9a-f]) -> ([0-9a-f]) opcode: 0x([0-9a-f]{2})(?: params:(.*))?", |
|
re.IGNORECASE |
|
) |
|
|
|
def hex_str_to_bytes(s): |
|
return [int(x, 16) for x in s.strip().split()] if s else [] |
|
|
|
def logical_address_name(addr): |
|
try: |
|
return cec.LogicalAddress(addr).name |
|
except Exception: |
|
return f"Device_{addr}" |
|
|
|
class CECMessage: |
|
def __init__(self, initiator, destination, opcode, params): |
|
self.initiator = initiator |
|
self.destination = destination |
|
self.opcode = opcode |
|
self.params = params |
|
self.opcode_name = OPCODE_MAP.get(opcode, f"UNKNOWN_0x{opcode:02X}") |
|
self.src_name = logical_address_name(initiator) |
|
self.dst_name = logical_address_name(destination) |
|
|
|
def __str__(self): |
|
return f"{self.src_name} -> {self.dst_name} | {self.opcode_name} | params: {self.params}" |
|
|
|
class CECMonitor: |
|
def __init__(self, log_path="/tmp/cec.log"): |
|
self.log_path = log_path |
|
self.callbacks = defaultdict(list) |
|
|
|
def on(self, opcode): |
|
"""Register a callback for a specific opcode (name or int)""" |
|
if isinstance(opcode, str): |
|
opcode = REVERSE_OPCODE_MAP.get(opcode.upper()) |
|
if opcode is None: |
|
raise ValueError("Unknown opcode") |
|
def decorator(func): |
|
self.callbacks[opcode].append(func) |
|
return func |
|
return decorator |
|
|
|
def parse_line(self, line): |
|
match = LOG_LINE_REGEX.match(line) |
|
if not match: |
|
return None |
|
|
|
initiator = int(match.group(1), 16) |
|
destination = int(match.group(2), 16) |
|
opcode = int(match.group(3), 16) |
|
params = hex_str_to_bytes(match.group(4)) |
|
return CECMessage(initiator, destination, opcode, params) |
|
|
|
def run(self): |
|
print(f"Following {self.log_path}... Ctrl+C to exit.") |
|
with open(self.log_path, "r") as f: |
|
f.seek(0, os.SEEK_END) |
|
while True: |
|
line = f.readline() |
|
if not line: |
|
time.sleep(0.1) |
|
continue |
|
|
|
msg = self.parse_line(line) |
|
if msg: |
|
print(msg) |
|
for cb in self.callbacks.get(msg.opcode, []): |
|
cb(msg) |
|
else: |
|
print(f"Unparsed line: {line.strip()}") |
|
|
|
cec_monitor = CECMonitor() |
|
|
|
|
|
class HyperHDRController: |
|
def __init__(self, host='localhost', port=8090): |
|
self.url = f'http://{host}:{port}/json-rpc' |
|
self.session = requests.Session() |
|
self.tan = 1 |
|
|
|
def _send(self, payload, retries=3): |
|
responses = [] |
|
for _ in range(retries): |
|
request_payload = copy.deepcopy(payload) |
|
request_payload['tan'] = self.tan |
|
self.tan += 1 |
|
try: |
|
response = self.session.post(self.url, json=request_payload) |
|
response.raise_for_status() |
|
result = response.json() |
|
responses.append(result) |
|
if result.get("success"): |
|
break |
|
except requests.RequestException as e: |
|
responses.append({"success": False, "error": str(e)}) |
|
return responses |
|
|
|
def power_off(self): |
|
return self._send({ |
|
"command": "componentstate", |
|
"componentstate": { |
|
"component": "LEDDEVICE", |
|
"state": False |
|
} |
|
}) |
|
|
|
def power_on(self): |
|
return self._send({ |
|
"command": "componentstate", |
|
"componentstate": { |
|
"component": "LEDDEVICE", |
|
"state": True |
|
} |
|
}) |
|
|
|
hyper_hdr = HyperHDRController() |
|
|
|
|
|
@cec_monitor.on("VENDOR_COMMAND_WITH_ID") |
|
def handle_vendor_command(msg: CECMessage): |
|
#print("💥 Vendor command received!") |
|
#print(msg) |
|
|
|
if msg.params == [8, 0, 70, 0, 9, 0, 1]: |
|
print("TV Powered off") |
|
hyper_hdr.power_off() |
|
|
|
if msg.params == [8, 0, 70, 0, 12, 0, 255]: |
|
print("TV Powered On") |
|
hyper_hdr.power_on() |
|
|
|
|
|
@cec_monitor.on("SET_STREAM_PATH") |
|
def handle_stream_path(msg: CECMessage): |
|
print("🎬 Stream path updated:") |
|
print(msg) |
|
|
|
@cec_monitor.on("USER_CONTROL_PRESSED") |
|
def handle_keypress(msg: CECMessage): |
|
if msg.params: |
|
keycode = msg.params[0] |
|
print(f"🔘 Key press detected: code={keycode:02X}") |
|
|
|
if __name__ == "__main__": |
|
cec_monitor.run() |