Last active
February 16, 2024 21:21
-
-
Save HNJAMeindersma/36583dde8e0eb8e97e2cff2e7d9d2836 to your computer and use it in GitHub Desktop.
Goal is to control switching HDMI sources on my TV, unfortunately my TV's API is hardly working so I'll need HDMI-CEC. Since LibreELEC-11.0 has no default package manager, thus PIP packages cannot easily be installed (like paho.mqtt). And considering cec-client is not functioning properly on my system, thus the more difficult cec-ctl had to be u…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=HDMI-CEC HTTP Bridge | |
After=multi-user.target | |
[Service] | |
Type=simple | |
Restart=always | |
ExecStart=/usr/bin/python3 /storage/LibreELEC-11.0_HDMI-CEC_HTTP_bridge.py | |
[Install] | |
WantedBy=multi-user.target |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from http.server import BaseHTTPRequestHandler, HTTPServer | |
import re | |
import subprocess | |
import json | |
# Function: add value to dictionary | |
def nested_set(dic, keys, value): | |
for key in keys[:-1]: | |
dic = dic.setdefault(key, {}) | |
dic[keys[-1]] = value | |
# Setup HTTP request handler | |
class S(BaseHTTPRequestHandler): | |
# Handle GET requests | |
def do_GET(self): | |
# Handle /topology | |
if self.path == '/topology': | |
# Create empty dictionary | |
topo_dict = { | |
'logical': {}, | |
'physical': {}, | |
'name': {} | |
} | |
topo_dict_list = [] | |
# --- 1: GET SELF --- | |
# Get own CEC info | |
topo_command_input = 'cec-ctl --logical-address' | |
topo_command_output = subprocess.check_output(topo_command_input, shell=True) | |
# Sanitize output | |
topo_output_string = topo_command_output.decode() # Decode to string | |
topo_output_string = topo_output_string.replace('\t', '') # Remove tabs | |
topo_output_string = re.sub(' +', ' ', topo_output_string) # Remove double spaces | |
# Create empty item dictionary | |
topo_dict_self = {'host':'true'} | |
# Split output to lines | |
topo_lines = list(filter(None, topo_output_string.split('\n'))) | |
# Run through lines | |
for topo_line in topo_lines: | |
# Check if key/value pair | |
topo_split_line = topo_line.split(' : ') | |
if len(topo_split_line) > 1: | |
# Sanitize 'Logical Address' | |
if topo_split_line[0].strip() == 'Logical Address': | |
topo_dict_self[topo_split_line[0].strip()] = topo_split_line[1].split('(')[0].strip().replace('\'', '') | |
# Correct self 'OSD Name' | |
elif topo_split_line[0].strip() == 'OSD Name': | |
topo_dict_self[topo_split_line[0].strip()] = subprocess.check_output('cat /etc/os-release | grep "NAME" | grep -v "PRETTY_NAME" | cut -d = -f 2 | tr -d \'"\' | tac | tr \'\n\' \' \'', shell=True).decode().strip() | |
# Save value to dictionary | |
else: | |
topo_dict_self[topo_split_line[0].strip()] = topo_split_line[1].strip().replace('\'', '') | |
# Place item dictionary inside dictionary | |
nested_set(topo_dict, ['logical', topo_dict_self.get('Logical Address')], topo_dict_self) | |
nested_set(topo_dict, ['physical', topo_dict_self.get('Physical Address')], topo_dict_self) | |
nested_set(topo_dict, ['name', topo_dict_self.get('Vendor ID') + ' - ' + topo_dict_self.get('OSD Name')], topo_dict_self) | |
nested_set(topo_dict, ['host'], topo_dict_self) | |
topo_dict_list.append(topo_dict_self) | |
# --- 2: GET OTHERS --- | |
# Get CEC topology | |
topo_command_input = 'cec-ctl --show-topology --skip-info' | |
topo_command_output = subprocess.check_output(topo_command_input, shell=True) | |
# Sanitize output | |
topo_output_string = topo_command_output.decode() # Decode to string | |
topo_output_string = topo_output_string.split('\n\n')[0] # Remove topology summary | |
topo_output_string = topo_output_string.replace('\t', '') # Remove tabs | |
topo_output_string = re.sub(' +', ' ', topo_output_string) # Remove double spaces | |
# Only continue if any results | |
if topo_output_string.strip() != 'Topology:': | |
# Split output to items | |
topo_items = list(filter(None, topo_output_string.split('System Information for device '))) | |
# Run through items | |
for topo_item in topo_items: | |
# Create empty item dictionary | |
topo_dict_item = {} | |
# Split item to lines | |
topo_lines = list(filter(None, topo_item.split('\n'))) | |
# Extract logical address | |
topo_dict_item['Logical Address'] = topo_item.split(' ')[0].strip() | |
# Run through lines | |
for topo_line in topo_lines: | |
# Check if key/value pair | |
topo_split_line = topo_line.split(' : ') | |
if len(topo_split_line) > 1: | |
# Save value to dictionary | |
topo_dict_item[topo_split_line[0].strip()] = topo_split_line[1].strip().replace('\'', '') | |
# Place item dictionary inside dictionary | |
nested_set(topo_dict, ['logical', topo_dict_item.get('Logical Address')], topo_dict_item) | |
nested_set(topo_dict, ['physical', topo_dict_item.get('Physical Address')], topo_dict_item) | |
nested_set(topo_dict, ['name', topo_dict_item.get('Vendor ID') + ' - ' + topo_dict_item.get('OSD Name')], topo_dict_item) | |
topo_dict_list.append(topo_dict_item) | |
# --- 3: FINISH & SEND --- | |
# Add list to dictionary | |
topo_dict['list'] = topo_dict_list | |
# Convert dictionary to JSON string | |
topo_json_string = json.dumps(topo_dict, indent = None) | |
# Send response | |
self.send_response(200) | |
self.send_header('Content-type', 'application/json') | |
self.end_headers() | |
self.wfile.write(topo_json_string.encode('utf-8')) | |
# Handle /state/logical/$ | |
elif self.path.startswith('/state/logical/'): | |
if self.path.split('/')[3].isdigit(): | |
output = subprocess.check_output('cec-ctl --to ' + self.path.split('/')[3] + ' --give-device-power-status | grep "pwr-state" | cut -d ":" -f2 | cut -d "(" -f1 | tr -d " "', shell=True) | |
if not output.strip(): | |
self.send_response(500) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('Empty CEC response!'.encode('utf-8')) | |
else: | |
self.send_response(200) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write(output.strip()) | |
else: | |
self.send_response(400) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('Incomplete request!'.encode('utf-8')) | |
# Not found | |
else: | |
self.send_response(404) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('GET endpoint not found'.encode('utf-8')) | |
# Handle POST requests | |
def do_POST(self): | |
# Check POST data size | |
content_length = int(self.headers['Content-Length']) | |
if content_length > 255: | |
self.send_response(507) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('POST body to large!'.encode('utf-8')) | |
# Retrieve POST data | |
else: | |
post_data = self.rfile.read(content_length).decode() | |
# Handle /state/logical/$ | |
if self.path.startswith('/state/logical/'): | |
if self.path.split('/')[3].isdigit(): | |
if post_data == 'on': | |
subprocess.check_output('cec-ctl --to ' + self.path.split('/')[3] + ' --image-view-on', shell=True) | |
self.send_response(200) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('on'.encode('utf-8')) | |
elif post_data == 'standby': | |
subprocess.check_output('cec-ctl --to ' + self.path.split('/')[3] + ' --standby', shell=True) | |
self.send_response(200) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('standby'.encode('utf-8')) | |
else: | |
self.send_response(400) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('POST body invalid!'.encode('utf-8')) | |
else: | |
self.send_response(400) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('Incomplete request!'.encode('utf-8')) | |
# Handle /source/logical/$ | |
elif self.path.startswith('/source/logical/'): | |
if self.path.split('/')[3].isdigit(): | |
#allowed_input = re.compile(r'^[0-9]\.[0-9]\.[0-9]\.[0-9]$') | |
allowed_input = re.compile(r'^\d{1}\.\d{1}\.\d{1}\.\d{1}$') | |
if (allowed_input.match(post_data) and content_length == 7): | |
input = 'cec-ctl --to ' + self.path.split('/')[3] + ' --image-view-on --active-source phys-addr=' + str(post_data) + ' | grep "phys-addr" | cut -d ":" -f2 | tr -d " "' | |
output = subprocess.check_output(input, shell=True) | |
self.send_response(200) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write(output.strip()) | |
else: | |
self.send_response(400) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('POST body invalid!'.encode('utf-8')) | |
else: | |
self.send_response(400) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('Incomplete request!'.encode('utf-8')) | |
# Handle /keypress/logical/$ | |
elif self.path.startswith('/keypress/logical/'): | |
if self.path.split('/')[3].isdigit(): | |
allowed_input = re.compile(r'^[a-zA-Z0-9-]*$') | |
if (allowed_input.match(post_data) and content_length < 64): | |
input = 'cec-ctl --to 0 --user-control-pressed ui-cmd=' + str(post_data) + ' | grep \'ui-cmd\' | cut -d ":" -f2 | cut -d "(" -f1 | tr -d " "' | |
output = subprocess.check_output(input, shell=True) | |
self.send_response(200) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write(output.strip()) | |
else: | |
self.send_response(400) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('POST body invalid!'.encode('utf-8')) | |
else: | |
self.send_response(400) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('Incomplete request!'.encode('utf-8')) | |
# Not found | |
else: | |
self.send_response(404) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write('POST endpoint not found'.encode('utf-8')) | |
# Setup HTTP server | |
def run(server_class=HTTPServer, handler_class=S, port=8232): | |
server_address = ('', port) | |
httpd = server_class(server_address, handler_class) | |
try: | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
pass | |
httpd.server_close() | |
# Run HTTP server | |
if __name__ == '__main__': | |
from sys import argv | |
if len(argv) == 2: | |
run(port=int(argv[1])) | |
else: | |
run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
String Woonkamer_CEC_topology "CEC topology" <text> { channel="http:url:woonkamer_cec:cec_topology" } | |
Switch Woonkamer_CEC_online "Online" <switch> { channel="network:servicedevice:woonkamer_cec:online" } | |
Number:Time Woonkamer_CEC_latency "Latency" <time> { channel="network:servicedevice:woonkamer_cec:latency" } | |
DateTime Woonkamer_CEC_last_seen "Last seen" <time> { channel="network:servicedevice:woonkamer_cec:lastseen" } | |
String Woonkamer_CEC_logical_address_TV "CEC logical address: TV" <text> { channel="http:url:woonkamer_cec:cec_logical_address_tv" } | |
String Woonkamer_CEC_physical_address_TV "CEC physical address: TV" <text> { channel="http:url:woonkamer_cec:cec_physical_address_tv" } | |
Switch Woonkamer_CEC_state_TV "CEC state: TV" <switch> { autoupdate="true" } | |
String Woonkamer_CEC_source_TV "CEC source: TV" <receiver> { autoupdate="true" } | |
String Woonkamer_CEC_keypress_TV "CEC key press: TV" <mediacontrol> { autoupdate="true" } | |
String Woonkamer_CEC_logical_address_Kodi "CEC logical address: Kodi" <text> { channel="http:url:woonkamer_cec:cec_logical_address_kodi" } | |
String Woonkamer_CEC_physical_address_Kodi "CEC physical address: Kodi" <text> { channel="http:url:woonkamer_cec:cec_physical_address_kodi" } | |
Switch Woonkamer_CEC_state_Kodi "CEC state: Kodi" <switch> { channel="network:servicedevice:woonkamer_cec:online" } | |
String Woonkamer_CEC_keypress_Kodi "CEC key press: Kodi" <mediacontrol> { autoupdate="true" } | |
String Woonkamer_CEC_logical_address_PlayStation4 "CEC logical address: PlayStation 4" <text> { channel="http:url:woonkamer_cec:cec_logical_address_playstation4" } | |
String Woonkamer_CEC_physical_address_PlayStation4 "CEC physical address: PlayStation 4" <text> { channel="http:url:woonkamer_cec:cec_physical_address_playstation4" } | |
Switch Woonkamer_CEC_state_PlayStation4 "CEC state: PlayStation 4" <switch> { autoupdate="true" } | |
String Woonkamer_CEC_keypress_PlayStation4 "CEC key press: PlayStation 4" <mediacontrol> { autoupdate="true" } | |
String Woonkamer_CEC_logical_address_Chromecast "CEC logical address: Chromecast" <text> { channel="http:url:woonkamer_cec:cec_logical_address_chromecast" } | |
String Woonkamer_CEC_physical_address_Chromecast "CEC physical address: Chromecast" <text> { channel="http:url:woonkamer_cec:cec_physical_address_chromecast" } | |
Switch Woonkamer_CEC_state_Chromecast "CEC state: Chromecast" <switch> { autoupdate="true" } | |
String Woonkamer_CEC_keypress_Chromecast "CEC key press: Chromecast" <mediacontrol> { autoupdate="true" } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// --- Config --- | |
val WoonkamerCEChost = "192.168.x.x." | |
val WoonkamerCECport = "8232" | |
val WoonkamerCECtimeout = 5000 | |
// --- TV --- | |
rule "CEC state status: TV" | |
when | |
Time cron "0/5 * * * * *" | |
then | |
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/state/logical/" + Woonkamer_CEC_logical_address_TV.state | |
val output = sendHttpGetRequest(url, newHashMap(), WoonkamerCECtimeout) | |
if (output == 'on' || output == 'to-on') Woonkamer_CEC_state_TV.postUpdate(ON) | |
if (output == 'standby' || output == 'to-standby') Woonkamer_CEC_state_TV.postUpdate(OFF) | |
end | |
rule "CEC state command: TV" | |
when | |
Item Woonkamer_CEC_state_TV received command | |
then | |
Thread::sleep(50) | |
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/state/logical/" + Woonkamer_CEC_logical_address_TV.state | |
val command = if (Woonkamer_CEC_state_TV.state == ON) "on" else "standby" | |
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout) | |
end | |
rule "CEC source command: TV" | |
when | |
Item Woonkamer_CEC_source_TV received command | |
then | |
if (Woonkamer_CEC_state_TV.state != ON) { | |
Woonkamer_CEC_state_TV.sendCommand(ON) | |
Thread::sleep(WoonkamerCECtimeout) | |
} | |
Thread::sleep(50) | |
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/source/logical/" + Woonkamer_CEC_logical_address_TV.state | |
var command = "" | |
if (Woonkamer_CEC_source_TV.state == "Kodi") { | |
command = "" + Woonkamer_CEC_physical_address_Kodi.state | |
} else if (Woonkamer_CEC_source_TV.state == "PlayStation 4") { | |
command = "" + Woonkamer_CEC_physical_address_PlayStation4.state | |
} else if (Woonkamer_CEC_source_TV.state == "Chromecast") { | |
command = "" + Woonkamer_CEC_physical_address_Chromecast.state | |
} | |
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout) | |
Thread::sleep(50) | |
Woonkamer_CEC_source_TV.postUpdate("") | |
end | |
rule "CEC key press command: TV" | |
when | |
Item Woonkamer_CEC_keypress_TV received command | |
then | |
Thread::sleep(50) | |
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/keypress/logical/" + Woonkamer_CEC_logical_address_TV.state | |
val command = "" + Woonkamer_CEC_keypress_TV.state | |
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout) | |
Thread::sleep(50) | |
Woonkamer_CEC_keypress_TV.postUpdate("") | |
end | |
// --- Kodi --- | |
rule "CEC key press command: Kodi" | |
when | |
Item Woonkamer_CEC_keypress_Kodi received command | |
then | |
Thread::sleep(50) | |
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/keypress/logical/" + Woonkamer_CEC_logical_address_Kodi.state | |
val command = "" + Woonkamer_CEC_keypress_Kodi.state | |
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout) | |
Thread::sleep(50) | |
Woonkamer_CEC_keypress_Kodi.postUpdate("") | |
end | |
// --- PlayStation 4 --- | |
rule "CEC state status: PlayStation 4" | |
when | |
Time cron "0/5 * * * * *" | |
then | |
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/state/logical/" + Woonkamer_CEC_logical_address_PlayStation4.state | |
val output = sendHttpGetRequest(url, newHashMap(), WoonkamerCECtimeout) | |
if (output == 'on' || output == 'to-on') Woonkamer_CEC_state_PlayStation4.postUpdate(ON) | |
if (output == 'standby' || output == 'to-standby') Woonkamer_CEC_state_PlayStation4.postUpdate(OFF) | |
end | |
rule "CEC key press command: PlayStation 4" | |
when | |
Item Woonkamer_CEC_keypress_PlayStation4 received command | |
then | |
Thread::sleep(50) | |
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/keypress/logical/" + Woonkamer_CEC_logical_address_PlayStation4.state | |
val command = "" + Woonkamer_CEC_keypress_PlayStation4.state | |
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout) | |
Thread::sleep(50) | |
Woonkamer_CEC_keypress_PlayStation4.postUpdate("") | |
end | |
// --- Chromecast --- | |
rule "CEC state status: Chromecast" | |
when | |
Time cron "0/5 * * * * *" | |
then | |
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/state/logical/" + Woonkamer_CEC_logical_address_Chromecast.state | |
val output = sendHttpGetRequest(url, newHashMap(), WoonkamerCECtimeout) | |
if (output == 'on' || output == 'to-on') Woonkamer_CEC_state_Chromecast.postUpdate(ON) | |
if (output == 'standby' || output == 'to-standby') Woonkamer_CEC_state_Chromecast.postUpdate(OFF) | |
end | |
rule "CEC key press command: Chromecast" | |
when | |
Item Woonkamer_CEC_keypress_Chromecast received command | |
then | |
Thread::sleep(50) | |
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/keypress/logical/" + Woonkamer_CEC_logical_address_Chromecast.state | |
val command = "" + Woonkamer_CEC_keypress_Chromecast.state | |
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout) | |
Thread::sleep(50) | |
Woonkamer_CEC_keypress_Chromecast.postUpdate("") | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Thing http:url:woonkamer_cec "CEC" @ "Woonkamer" [ | |
baseURL = "http://192.168.x.x:8232", | |
refresh = "300", | |
timeout = "20000", | |
bufferSize = "9192", | |
stateMethod = "GET", | |
contentType = "application/json", | |
ignoreSSLErrors = "true" | |
] { | |
Channels: | |
Type string : cec_topology "CEC topology" [ | |
stateExtension = "/topology", | |
mode = "READONLY" | |
] | |
Type string : cec_logical_address_tv "CEC logical address: TV" [ | |
stateExtension = "/topology", | |
stateTransformation = "JSONPATH:$.['name'].['0x00903e (Philips) - TV'].['Logical Address']", | |
mode = "READONLY" | |
] | |
Type string : cec_physical_address_tv "CEC physical address: TV" [ | |
stateExtension = "/topology", | |
stateTransformation = "JSONPATH:$.['name'].['0x00903e (Philips) - TV'].['Physical Address']", | |
mode = "READONLY" | |
] | |
Type string : cec_logical_address_kodi "CEC logical address: Kodi" [ | |
stateExtension = "/topology", | |
stateTransformation = "JSONPATH:$.['name'].['0x001582 (Pulse-Eight) - LibreELEC'].['Logical Address']", | |
mode = "READONLY" | |
] | |
Type string : cec_physical_address_kodi "CEC physical address: Kodi" [ | |
stateExtension = "/topology", | |
stateTransformation = "JSONPATH:$.['name'].['0x001582 (Pulse-Eight) - LibreELEC'].['Physical Address']", | |
mode = "READONLY" | |
] | |
Type string : cec_logical_address_playstation4 "CEC logical address: PlayStation 4" [ | |
stateExtension = "/topology", | |
stateTransformation = "JSONPATH:$.['name'].['0x080046 (Sony) - PlayStation 4'].['Logical Address']", | |
mode = "READONLY" | |
] | |
Type string : cec_physical_address_playstation4 "CEC physical address: PlayStation 4" [ | |
stateExtension = "/topology", | |
stateTransformation = "JSONPATH:$.['name'].['0x080046 (Sony) - PlayStation 4'].['Physical Address']", | |
mode = "READONLY" | |
] | |
Type string : cec_logical_address_chromecast "CEC logical address: Chromecast" [ | |
stateExtension = "/topology", | |
stateTransformation = "JSONPATH:$.['name'].['0x001a11 (Google) - Chromecast'].['Logical Address']", | |
mode = "READONLY" | |
] | |
Type string : cec_physical_address_chromecast "CEC physical address: Chromecast" [ | |
stateExtension = "/topology", | |
stateTransformation = "JSONPATH:$.['name'].['0x001a11 (Google) - Chromecast'].['Physical Address']", | |
mode = "READONLY" | |
] | |
} | |
Thing network:servicedevice:woonkamer_cec "CEC" @ "Woonkamer" [ | |
hostname = "192.168.x.x", | |
port = 8232, | |
retry = 1, | |
timeout = 5000, | |
refreshInterval = 5000 | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment