Skip to content

Instantly share code, notes, and snippets.

@simons-public
Last active June 7, 2025 05:10
Show Gist options
  • Save simons-public/456d601cf733446a41f06d4980830217 to your computer and use it in GitHub Desktop.
Save simons-public/456d601cf733446a41f06d4980830217 to your computer and use it in GitHub Desktop.
Kodi CEC Sharing

Kodi CEC sharing

Silly/hacky combination of libcec patch to write messages to /tmp/cec.log and a python parser to allow using cec messages to do other actions not avaiable in Kodi (i.e. turn off HyperHDR leds)

This is a quick hacky patch and is not polished. at. all.

--- a/src/libcec/LibCEC.cpp
+++ b/src/libcec/LibCEC.cpp
@@ -421,6 +421,27 @@
bool CLibCEC::CommandHandlerCB(const cec_command &command)
{
+// --- begin log injection ---
+ fprintf(stderr, "[libcec] CommandHandlerCB called\n");
+FILE *fp = fopen("/tmp/cec.log", "a");
+if (fp) {
+ fprintf(fp, "CEC: %x -> %x opcode: 0x%02x",
+ command.initiator,
+ command.destination,
+ command.opcode);
+
+ if (command.parameters.size > 0) {
+ fprintf(fp, " params:");
+ for (uint8_t i = 0; i < command.parameters.size; ++i) {
+ fprintf(fp, " %02x", command.parameters.data[i]);
+ }
+ }
+
+ fprintf(fp, "\n");
+ fclose(fp);
+}
+// --- end log injection ---
+
// send the command to all clients
for (std::vector<CECClientPtr>::iterator it = m_clients.begin(); it != m_clients.end(); it++)
if ((*it)->QueueCommandHandler(command))
import cec
import re
import time
import os
import copy
from collections import defaultdict
import requests
# Build opcode maps
OPCODE_MAP = {
v: k for k, v in vars(cec).items()
if k.startswith("CEC_OPCODE_") and isinstance(v, int)
}
REVERSE_OPCODE_MAP = {
k.replace("CEC_OPCODE_", ""): v for v, k in OPCODE_MAP.items()
}
LOG_LINE_REGEX = re.compile(
r"CEC: ([0-9a-f]) -> ([0-9a-f]) opcode: 0x([0-9a-f]{2})(?: params:(.*))?",
re.IGNORECASE
)
def hex_str_to_bytes(s):
return [int(x, 16) for x in s.strip().split()] if s else []
def logical_address_name(addr):
try:
return cec.LogicalAddress(addr).name
except Exception:
return f"Device_{addr}"
class CECMessage:
def __init__(self, initiator, destination, opcode, params):
self.initiator = initiator
self.destination = destination
self.opcode = opcode
self.params = params
self.opcode_name = OPCODE_MAP.get(opcode, f"UNKNOWN_0x{opcode:02X}")
self.src_name = logical_address_name(initiator)
self.dst_name = logical_address_name(destination)
def __str__(self):
return f"{self.src_name} -> {self.dst_name} | {self.opcode_name} | params: {self.params}"
class CECMonitor:
def __init__(self, log_path="/tmp/cec.log"):
self.log_path = log_path
self.callbacks = defaultdict(list)
def on(self, opcode):
"""Register a callback for a specific opcode (name or int)"""
if isinstance(opcode, str):
opcode = REVERSE_OPCODE_MAP.get(opcode.upper())
if opcode is None:
raise ValueError("Unknown opcode")
def decorator(func):
self.callbacks[opcode].append(func)
return func
return decorator
def parse_line(self, line):
match = LOG_LINE_REGEX.match(line)
if not match:
return None
initiator = int(match.group(1), 16)
destination = int(match.group(2), 16)
opcode = int(match.group(3), 16)
params = hex_str_to_bytes(match.group(4))
return CECMessage(initiator, destination, opcode, params)
def run(self):
print(f"Following {self.log_path}... Ctrl+C to exit.")
with open(self.log_path, "r") as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if not line:
time.sleep(0.1)
continue
msg = self.parse_line(line)
if msg:
print(msg)
for cb in self.callbacks.get(msg.opcode, []):
cb(msg)
else:
print(f"Unparsed line: {line.strip()}")
cec_monitor = CECMonitor()
class HyperHDRController:
def __init__(self, host='localhost', port=8090):
self.url = f'http://{host}:{port}/json-rpc'
self.session = requests.Session()
self.tan = 1
def _send(self, payload, retries=3):
responses = []
for _ in range(retries):
request_payload = copy.deepcopy(payload)
request_payload['tan'] = self.tan
self.tan += 1
try:
response = self.session.post(self.url, json=request_payload)
response.raise_for_status()
result = response.json()
responses.append(result)
if result.get("success"):
break
except requests.RequestException as e:
responses.append({"success": False, "error": str(e)})
return responses
def power_off(self):
return self._send({
"command": "componentstate",
"componentstate": {
"component": "LEDDEVICE",
"state": False
}
})
def power_on(self):
return self._send({
"command": "componentstate",
"componentstate": {
"component": "LEDDEVICE",
"state": True
}
})
hyper_hdr = HyperHDRController()
@cec_monitor.on("VENDOR_COMMAND_WITH_ID")
def handle_vendor_command(msg: CECMessage):
#print("💥 Vendor command received!")
#print(msg)
if msg.params == [8, 0, 70, 0, 9, 0, 1]:
print("TV Powered off")
hyper_hdr.power_off()
if msg.params == [8, 0, 70, 0, 12, 0, 255]:
print("TV Powered On")
hyper_hdr.power_on()
@cec_monitor.on("SET_STREAM_PATH")
def handle_stream_path(msg: CECMessage):
print("🎬 Stream path updated:")
print(msg)
@cec_monitor.on("USER_CONTROL_PRESSED")
def handle_keypress(msg: CECMessage):
if msg.params:
keycode = msg.params[0]
print(f"🔘 Key press detected: code={keycode:02X}")
if __name__ == "__main__":
cec_monitor.run()
[Unit]
Description=CEC Log Parser
After=network.target kodi-x11.service
[Service]
Type=simple
WorkingDirectory=/usr/local/scripts
ExecStart=/usr/bin/python3 -u ceclog_parser.py
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
diff --git a/PKGBUILD b/PKGBUILD
index 4682262..1b4166d 100644
--- a/PKGBUILD
+++ b/PKGBUILD
@@ -15,6 +15,12 @@ optdepends=('python: use cec in python applications')
source=("$pkgname-$pkgver.tar.gz::https://github.com/Pulse-Eight/$pkgname/archive/$pkgname-$pkgver.tar.gz")
sha256sums=('7f9e57ae9fad37649adb6749b8f1310a71ccf3e92ae8b2d1cc9e8ae2d1da83f8')
+prepare() {
+ cd "$pkgname-$pkgname-$pkgver"
+ file src/libcec/LibCEC.cpp
+ patch -p1 < ../../ceclog.patch
+}
+
build() {
cd "$pkgname-$pkgname-$pkgver"
mkdir build
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment