Skip to content

Instantly share code, notes, and snippets.

@BenJamesAndo
Created October 29, 2025 11:34
Show Gist options
  • Select an option

  • Save BenJamesAndo/4da1f7ca96666ad218721d572a7b1b0f to your computer and use it in GitHub Desktop.

Select an option

Save BenJamesAndo/4da1f7ca96666ad218721d572a7b1b0f to your computer and use it in GitHub Desktop.
Crestron RoomView AppDaemon for Home Assistant
crestron_projector:
module: crestron_projector
class: CrestronProjector
ip_address: "192.168.1.139" # Change to your projector's IP address
debug: false # Set to true to enable debug logging
import appdaemon.plugins.hass.hassapi as hass
import socket
import threading
import struct
import re
class CrestronProjector(hass.Hass):
def initialize(self):
self.ip = self.args["ip_address"]
self.port = 41794
self.entity_id = "media_player.crestron_projector"
self.debug = self.args.get("debug", False)
self.keepalive_interval = 10 # Keep-alive ping every 10 seconds
self.socket = None
self.listener_thread = None
self.running = False
self.current_volume = None
self.commands = {
"power_on": b"\x05\x00\x06\x00\x00\x03\x00\x04\x00",
"power_off": b"\x05\x00\x06\x00\x00\x03\x00\x05\x00",
"volume_up": b"\x05\x00\x06\x00\x00\x03\x00\xfa\x13",
"volume_down": b"\x05\x00\x06\x00\x00\x03\x00\xfb\x13",
"status": b"\x05\x00\x05\x00\x00\x02\x03\x1e",
"init_lang": b"\x05\x00\x09\x00\x00\x06\x15\x13\xe1\x01\x65\x6e",
"init_cmd": b"\x05\x00\x06\x00\x00\x03\x00\x00\x00",
"query_caps": b"\x05\x00\x05\x00\x00\x02\x03\x1d",
"keepalive_ping": b"\x0d\x00\x02\x00\x00",
# Input sources
"source_computer1": b"\x05\x00\x06\x00\x00\x03\x00\xcd\x13",
"source_computer2": b"\x05\x00\x06\x00\x00\x03\x00\xce\x13",
"source_video": b"\x05\x00\x06\x00\x00\x03\x00\xcf\x13",
"source_svideo": b"\x05\x00\x06\x00\x00\x03\x00\xd0\x13",
"source_hdmi": b"\x05\x00\x06\x00\x00\x03\x00\xd1\x13",
"source_pc_less": b"\x05\x00\x06\x00\x00\x03\x00\xd2\x13",
"source_usb_display": b"\x05\x00\x06\x00\x00\x03\x00\xd3\x13",
"source_lan_display": b"\x05\x00\x06\x00\x00\x03\x00\xd4\x13",
# Menu navigation
"menu": b"\x05\x00\x06\x00\x00\x03\x00\x1d\x14",
"enter": b"\x05\x00\x06\x00\x00\x03\x00\x23\x14",
"up": b"\x05\x00\x06\x00\x00\x03\x00\x1e\x14",
"down": b"\x05\x00\x06\x00\x00\x03\x00\x1f\x14",
"left": b"\x05\x00\x06\x00\x00\x03\x00\x20\x14",
"right": b"\x05\x00\x06\x00\x00\x03\x00\x21\x14",
# Freeze
"freeze": b"\x05\x00\x06\x00\x00\x03\x00\xf0\x13",
"unfreeze": b"\x05\x00\x06\x00\x00\x03\x00\xf1\x13",
# AV Mute
"av_mute": b"\x05\x00\x06\x00\x00\x03\x00\xfc\x13",
"av_unmute": b"\x05\x00\x06\x00\x00\x03\x00\xfd\x13",
}
# All possible source names and their command mappings (filtered by capabilities after connection)
self.all_source_names = ["Computer1", "Computer2", "Video", "S-Video", "HDMI", "PC Less Presentation", "USB Display", "LAN Display"]
self.all_source_map = {
"Computer1": "source_computer1",
"Computer2": "source_computer2",
"Video": "source_video",
"S-Video": "source_svideo",
"HDMI": "source_hdmi",
"PC Less Presentation": "source_pc_less",
"USB Display": "source_usb_display",
"LAN Display": "source_lan_display",
}
# Map source byte codes to friendly names for status parsing
self.source_code_map = {
0xcd: "Computer1",
0xce: "Computer2",
0xcf: "Video",
0xd0: "S-Video",
0xd1: "HDMI",
0xd2: "PC Less Presentation",
0xd3: "USB Display",
0xd4: "LAN Display",
}
# Fallback sources if capabilities are not detected
fallback_sources = ["HDMI", "S-Video", "Computer1", "Video"]
self.source_list = fallback_sources
self.source_map = {name: self.all_source_map[name] for name in fallback_sources}
self.capabilities_detected = False
self.detected_capabilities = []
self.capabilities_detected = False
# SUPPORT_TURN_ON (128) | SUPPORT_TURN_OFF (256) | SUPPORT_VOLUME_STEP (1024) | SUPPORT_VOLUME_SET (4) | SUPPORT_SELECT_SOURCE (2048) = 3460
initial_attrs = {
"supported_features": 3460,
"friendly_name": "Crestron Projector",
"source_list": self.source_list,
"device_class": "tv",
"icon": "mdi:projector",
"device_info": {
"identifiers": [["crestron_projector", self.ip]],
"name": "Crestron Projector",
"manufacturer": "Crestron",
"model": "Room View",
"sw_version": "1.0"
}
}
self.set_state(self.entity_id, state="unavailable", attributes=initial_attrs)
self.set_state("switch.crestron_projector_freeze", state="off", attributes={
"friendly_name": "Projector Freeze",
"icon": "mdi:pause-circle"
})
self.set_state("switch.crestron_projector_av_mute", state="off", attributes={
"friendly_name": "Projector AV Mute",
"icon": "mdi:audio-video-off"
})
self.create_button_entities()
self.listen_event(self.handle_service, "call_service")
self.run_in(self.start_connection, 0)
def create_button_entities(self):
"""Create button entities for menu navigation commands."""
import datetime
buttons = {
"button.crestron_projector_menu": {"name": "Menu", "icon": "mdi:menu"},
"button.crestron_projector_enter": {"name": "Enter", "icon": "mdi:keyboard-return"},
"button.crestron_projector_up": {"name": "Up", "icon": "mdi:chevron-up"},
"button.crestron_projector_down": {"name": "Down", "icon": "mdi:chevron-down"},
"button.crestron_projector_left": {"name": "Left", "icon": "mdi:chevron-left"},
"button.crestron_projector_right": {"name": "Right", "icon": "mdi:chevron-right"},
}
for entity_id, config in buttons.items():
attrs = {
"friendly_name": f"Projector {config['name']}",
"icon": config["icon"],
"device_class": "button"
}
self.set_state(entity_id, state=datetime.datetime.now().isoformat(), attributes=attrs)
def start_connection(self, kwargs):
"""Initiate the persistent connection thread to receive real-time updates."""
try:
self.running = True
self.listener_thread = threading.Thread(target=self.listen_for_updates, daemon=True)
self.listener_thread.start()
self.log("Started persistent connection to projector")
except Exception as e:
self.log(f"Error starting connection: {e}", level="ERROR")
def finalize_capabilities(self, kwargs):
"""Update the source list with detected capabilities once all capability packets are received."""
if self.detected_capabilities and not self.capabilities_detected:
self.source_list = self.detected_capabilities
self.source_map = {name: self.all_source_map[name] for name in self.detected_capabilities}
self.capabilities_detected = True
current_state = self.get_state(self.entity_id)
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
attrs["source_list"] = self.source_list
self.set_state(self.entity_id, state=current_state, attributes=attrs)
self.log(f"Capabilities finalized: {len(self.detected_capabilities)} sources detected: {', '.join(self.detected_capabilities)}")
elif self.debug:
self.log(f"No capabilities detected, using fallback sources")
def listen_for_updates(self):
"""Listen for real-time status updates from the projector over the persistent socket."""
while self.running:
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(10)
self.socket.connect((self.ip, self.port))
self.log("Connected to projector, performing handshake...")
# Send initialization commands
if self.debug:
self.log("Sending status query...")
self.socket.sendall(self.commands["status"])
if self.debug:
self.log("Sending language init...")
self.socket.sendall(self.commands["init_lang"])
if self.debug:
self.log("Sending init command...")
self.socket.sendall(self.commands["init_cmd"])
if self.debug:
self.log("Sending capability query...")
self.socket.sendall(self.commands["query_caps"])
self.log("Handshake complete, listening for updates...")
self.socket.settimeout(1.0)
last_keepalive = self.get_now_ts()
last_data_received = self.get_now_ts()
keepalive_failures = 0
buffer = b''
while self.running:
try:
data = self.socket.recv(4096)
if not data:
self.log("Connection closed by projector")
break
keepalive_failures = 0
last_data_received = self.get_now_ts()
if self.debug:
self.log(f"Received {len(data)} bytes: {data[:50].hex(':')}")
buffer += data
while len(buffer) >= 2:
packet_type = buffer[0]
if self.debug and len(buffer) < 100:
self.log(f"Buffer processing: type={packet_type:02x}, buffer_len={len(buffer)}, start={buffer[:20].hex(':')}")
elif self.debug:
self.log(f"Buffer processing: type={packet_type:02x}, buffer_len={len(buffer)}")
# Handle handshake packets (0x01, 0x02, 0x0f) and skip them
if packet_type in [0x01, 0x02, 0x0f]:
if packet_type == 0x0f and len(buffer) >= 4:
if self.debug:
self.log(f"Skipping handshake packet 0f")
buffer = buffer[4:]
continue
elif packet_type == 0x01 and len(buffer) >= 10:
if self.debug:
self.log(f"Skipping handshake packet 01")
buffer = buffer[10:]
continue
elif packet_type == 0x02 and len(buffer) >= 7:
if self.debug:
self.log(f"Skipping handshake packet 02")
buffer = buffer[7:]
continue
else:
if self.debug:
self.log(f"Waiting for more handshake data")
break
# Handle keep-alive response (0x0e) and skip it
if packet_type == 0x0e and len(buffer) >= 5:
if buffer[0:5] == b'\x0e\x00\x02\x00\x00':
if self.debug:
self.log("Received keep-alive pong")
buffer = buffer[5:]
continue
if packet_type == 0x05:
if len(buffer) < 2 or buffer[1] != 0x00:
if self.debug:
self.log(f"Invalid Crestron header, expected 05:00, got {buffer[:2].hex(':')}")
buffer = buffer[1:]
continue
if len(buffer) < 5:
if self.debug:
self.log(f"Crestron packet too short (< 5 bytes), waiting...")
break
# Length is in little-endian format, represents total_packet_length - 3
data_len = struct.unpack('<H', buffer[2:4])[0]
packet_len = data_len + 3
if self.debug:
self.log(f"Crestron packet: header={buffer[:min(10,len(buffer))].hex(':')}, data_len={data_len}, packet_len={packet_len}, buffer={len(buffer)}")
if len(buffer) < packet_len:
if self.debug:
self.log(f"Waiting for complete packet (need {packet_len}, have {len(buffer)})")
break
packet = buffer[:packet_len]
buffer = buffer[packet_len:]
if self.debug:
self.log(f"Processing Crestron packet: {len(packet)} bytes, header: {packet[:10].hex(':')}")
self.process_packet(packet)
else:
if self.debug:
self.log(f"Skipping invalid data at start of buffer: {buffer[0]:02x}")
buffer = buffer[1:]
except socket.timeout:
now = self.get_now_ts()
if now - last_keepalive >= self.keepalive_interval:
if self.debug:
self.log("Sending keep-alive ping.")
try:
self.socket.sendall(self.commands["keepalive_ping"])
last_keepalive = now
except Exception as e:
self.log(f"Failed to send keepalive: {e}", level="WARNING")
break
if now - last_data_received > 30:
current_state = self.get_state(self.entity_id)
if current_state != "unavailable":
self.log("No data received for 30 seconds, marking projector as unavailable", level="WARNING")
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
self.set_state(self.entity_id, state="unavailable", attributes=attrs)
if now - last_data_received > 45:
self.log("No data received for 45 seconds, connection appears dead. Reconnecting...", level="WARNING")
break
continue
except Exception as e:
if self.running:
self.log(f"Connection error: {e}, reconnecting in 10s...", level="WARNING")
self.run_in(lambda kwargs: self.listen_for_updates(), 10)
return
finally:
if self.socket:
try:
self.socket.close()
except:
pass
self.socket = None
self.log("Listener thread stopped.")
def process_packet(self, packet):
"""Process an incoming packet from the projector, updating entity states."""
current_state = self.get_state(self.entity_id)
if current_state == "unavailable":
self.log("Projector connection restored - setting to off and querying actual status")
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
self.set_state(self.entity_id, state="off", attributes=attrs)
self.run_in(lambda k: self.send_command("status"), 1.0)
if self.debug and len(packet) < 100:
hex_dump = packet.hex(':')
ascii_dump = packet.decode(errors="ignore").replace('\x00', '·').replace('\x03', '│').replace('\x05', '┘')
self.log(f"Packet [{len(packet)}B]: {hex_dump}")
self.log(f" ASCII: {ascii_dump}")
elif self.debug:
self.log(f"Received status packet: {len(packet)} bytes")
# Check for Freeze status: Pattern: 05:00:06:00:00:03:00:XX:93
if len(packet) == 9 and packet[0:7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[8] == 0x93:
if packet[7] == 0xf1:
self.set_state("switch.crestron_projector_freeze", state="on", attributes={
"friendly_name": "Projector Freeze",
"icon": "mdi:pause-circle"
})
if self.debug:
self.log("Freeze status: ON")
elif packet[7] == 0xf0:
self.set_state("switch.crestron_projector_freeze", state="off", attributes={
"friendly_name": "Projector Freeze",
"icon": "mdi:pause-circle"
})
if self.debug:
self.log("Freeze status: OFF")
# Check for AV mute status: Pattern: 05:00:06:00:00:03:00:XX:93 (Mute ON: 0xfd, Mute OFF: 0xfc)
if len(packet) == 9 and packet[0:7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[8] == 0x93:
av_mute_code = packet[7]
if av_mute_code in [0xfc, 0xfd]: # Only process AV mute codes
if av_mute_code == 0xfd:
# AV mute on
self.set_state("switch.crestron_projector_av_mute", state="on", attributes={
"friendly_name": "Projector AV Mute",
"icon": "mdi:audio-video-off"
})
if self.debug:
self.log("AV Mute status: ON")
elif av_mute_code == 0xfc:
# AV mute off
self.set_state("switch.crestron_projector_av_mute", state="off", attributes={
"friendly_name": "Projector AV Mute",
"icon": "mdi:audio-video-off"
})
if self.debug:
self.log("AV Mute status: OFF")
# Check source status: 05:00:06:00:00:03:00:XX:13 (active source ends with :13)
# Pattern: 05:00:06:00:00:03:00:cd:13 (Computer1), 05:00:06:00:00:03:00:d1:13 (HDMI), etc.
if len(packet) == 9 and packet[0:7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[8] == 0x13:
source_code = packet[7]
if source_code in self.source_code_map:
source_name = self.source_code_map[source_code]
current_state = self.get_state(self.entity_id)
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
attrs["source"] = source_name
self.set_state(self.entity_id, state=current_state, attributes=attrs)
if self.debug:
self.log(f"Source status: {source_name} (0x{source_code:02x})")
# Parse volume, freeze, AV mute, and source status from large status packets
if len(packet) > 100:
# Search for volume status: Pattern: 05:00:08:00:00:05:14:13:93:XX:XX
pos = 0
while pos < len(packet) - 11:
if packet[pos:pos+9] == b'\x05\x00\x08\x00\x00\x05\x14\x13\x93':
volume_bytes = packet[pos+9:pos+11]
volume_raw = struct.unpack('>H', volume_bytes)[0]
# Convert raw value to percentage and round to nearest 5% if within 2%
volume_level_actual = volume_raw / 65535.0
volume_percent = volume_level_actual * 100
nearest_5 = round(volume_percent / 5) * 5
# Only round if within 2% of a 5% increment
if abs(volume_percent - nearest_5) <= 2:
volume_level_display = nearest_5 / 100
else:
volume_level_display = volume_level_actual
self.log(f"Volume update: raw={volume_raw} (0x{volume_bytes.hex()}) = {volume_level_display:.0%}")
self.current_volume = volume_raw
# Update entity with volume
current_state = self.get_state(self.entity_id)
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
attrs["volume_raw"] = volume_raw
attrs["volume_level"] = volume_level_display
self.set_state(self.entity_id, state=current_state, attributes=attrs)
break
pos += 1
# Search for freeze status: Pattern: 05:00:06:00:00:03:00:f1:93 (ON) or 05:00:06:00:00:03:00:f0:93 (OFF)
pos = 0
while pos < len(packet) - 9:
if packet[pos:pos+7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[pos+8] == 0x93:
if packet[pos+7] == 0xf1:
self.set_state("switch.crestron_projector_freeze", state="on", attributes={
"friendly_name": "Projector Freeze",
"icon": "mdi:pause-circle"
})
if self.debug:
self.log("Freeze status (status packet): ON")
break
elif packet[pos+7] == 0xf0:
self.set_state("switch.crestron_projector_freeze", state="off", attributes={
"friendly_name": "Projector Freeze",
"icon": "mdi:pause-circle"
})
if self.debug:
self.log("Freeze status (status packet): OFF")
break
pos += 1
# Search for AV mute status: Pattern: 05:00:06:00:00:03:00:fc:93 (ON) or 05:00:06:00:00:03:00:fd:93 (OFF)
pos = 0
while pos < len(packet) - 9:
if packet[pos:pos+7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[pos+8] == 0x93:
av_mute_code = packet[pos+7]
if av_mute_code == 0xfd:
self.set_state("switch.crestron_projector_av_mute", state="on", attributes={
"friendly_name": "Projector AV Mute",
"icon": "mdi:audio-video-off"
})
if self.debug:
self.log("AV Mute status (status packet): ON")
break
elif av_mute_code == 0xfc:
self.set_state("switch.crestron_projector_av_mute", state="off", attributes={
"friendly_name": "Projector AV Mute",
"icon": "mdi:audio-video-off"
})
if self.debug:
self.log("AV Mute status (status packet): OFF")
break
pos += 1
# Search for active source status: Pattern: 05:00:06:00:00:03:00:XX:13
pos = 0
while pos < len(packet) - 9:
if packet[pos:pos+7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[pos+8] == 0x13:
source_code = packet[pos+7]
if source_code in self.source_code_map:
source_name = self.source_code_map[source_code]
current_state = self.get_state(self.entity_id)
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
attrs["source"] = source_name
self.set_state(self.entity_id, state=current_state, attributes=attrs)
if self.debug:
self.log(f"Source status (status packet): {source_name} (0x{source_code:02x})")
break
pos += 1
# Check for standalone volume status: Pattern: 05:00:08:00:00:05:14:13:93:XX:XX
if len(packet) == 11 and packet[0:9] == b'\x05\x00\x08\x00\x00\x05\x14\x13\x93':
volume_bytes = packet[9:11]
volume_raw = struct.unpack('>H', volume_bytes)[0]
# Convert raw value to percentage and round to nearest 5% if within 2%
volume_level_actual = volume_raw / 65535.0
volume_percent = volume_level_actual * 100
nearest_5 = round(volume_percent / 5) * 5
# Only round if within 2% of a 5% increment
if abs(volume_percent - nearest_5) <= 2:
volume_level_display = nearest_5 / 100
else:
volume_level_display = volume_level_actual
self.log(f"Volume update: raw={volume_raw} (0x{volume_bytes.hex()}) = {volume_level_display:.0%}")
self.current_volume = volume_raw
# Update entity with volume
current_state = self.get_state(self.entity_id)
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
attrs["volume_raw"] = volume_raw
attrs["volume_level"] = volume_level_display
self.set_state(self.entity_id, state=current_state, attributes=attrs)
return
# Power status and text data parsing
response = packet.decode(errors="ignore")
# Parse model information from status response
# Pattern: MANUFACTURER:Projector:MODEL:VERSION
manufacturer_match = re.search(r'([A-Z]+):Projector:([^:\x00]+)', response)
if manufacturer_match:
try:
manufacturer = manufacturer_match.group(1).title() # e.g., MITSUBISHI -> Mitsubishi
model = manufacturer_match.group(2).strip()
current_state = self.get_state(self.entity_id)
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
# Update device_info with detected manufacturer and model
attrs["device_info"] = {
"identifiers": [["crestron_projector", self.ip]],
"name": "Crestron Projector",
"manufacturer": manufacturer,
"model": model,
"sw_version": "1.0"
}
self.set_state(self.entity_id, state=current_state, attributes=attrs)
if self.debug:
self.log(f"Device detected: {manufacturer} {model}")
except Exception as e:
if self.debug:
self.log(f"Error parsing model: {e}", level="WARNING")
# Parse lamp hours from status response
# Pattern: "1234 Hours"
lamp_match = re.search(r'(\d+)\s*Hours', response)
if lamp_match:
try:
lamp_hours = int(lamp_match.group(1))
current_state = self.get_state(self.entity_id)
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
attrs["lamp_hours"] = lamp_hours
self.set_state(self.entity_id, state=current_state, attributes=attrs)
if self.debug:
self.log(f"Lamp hours: {lamp_hours}")
except Exception as e:
if self.debug:
self.log(f"Error parsing lamp hours: {e}", level="WARNING")
# Parse available sources from capabilities response (only run once)
if not self.capabilities_detected:
if self.debug:
self.log(f"Checking for source capability in packet (length={len(packet)})")
# Check for capability packet: 05:00:06:00:00:03:00:XX:YY where XX is the capability code
if len(packet) >= 8 and packet[4] == 0x00 and packet[5] == 0x03 and packet[6] == 0x00:
capability_code = packet[7]
# Check if this is a recognized source code
if capability_code in self.source_code_map:
source_name = self.source_code_map[capability_code]
if source_name not in self.detected_capabilities:
self.detected_capabilities.append(source_name)
if self.debug:
self.log(f"Detected source capability: {source_name} (code: {capability_code:02x})")
# Set a timer to finalize capabilities after receiving several packets
if not hasattr(self, 'capability_timer_set'):
self.capability_timer_set = True
self.run_in(self.finalize_capabilities, 2)
# Parse general power status from text response
if "Power On" in response:
if current_state not in ["on", "warming_up"]:
self.log("Projector power: ON")
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
self.set_state(self.entity_id, state="on", attributes=attrs)
elif "Cooling Down" in response:
if not current_state or not current_state.startswith("Cooling Down"):
self.log("Projector power: COOLING DOWN")
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
self.set_state(self.entity_id, state="Cooling Down", attributes=attrs)
elif "Warming Up" in response:
if not current_state or not current_state.startswith("Warming Up"):
self.log("Projector power: WARMING UP")
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
self.set_state(self.entity_id, state="Warming Up", attributes=attrs)
elif "Power Off" in response:
if current_state != "off":
self.log("Projector power: OFF")
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
self.set_state(self.entity_id, state="off", attributes=attrs)
# Parse cooling progress from packet: 05:00:08:00:00:05:14:13:92:XX:XX
# The last two bytes count DOWN from ~65000 to 0. This is the most accurate indicator.
if len(packet) == 11 and packet[0:9] == b'\x05\x00\x08\x00\x00\x05\x14\x13\x92':
# Extract the progress bytes (last 2 bytes)
progress_bytes = struct.unpack('>H', packet[9:11])[0] # Big-endian 16-bit value
# Convert to approximate percentage
progress_percent = int((progress_bytes / 65000.0) * 100)
if progress_percent > 100:
progress_percent = 100
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
# When cooling completes (very low percentage), transition to off state
if progress_percent <= 2:
self.log("Cooling complete - transitioning to OFF")
self.set_state(self.entity_id, state="off", attributes=attrs)
else:
# Update to cooling state
attrs["cooling_progress"] = progress_percent
self.set_state(self.entity_id, state=f"Cooling Down {progress_percent}%", attributes=attrs)
self.log(f"Cooling progress: {progress_percent}% (raw: 0x{progress_bytes:04x})")
# Parse warming progress from packet: 05:00:08:00:00:05:14:13:91:XX:XX
# The last two bytes count UP from 0 to ~64000. This is the most accurate indicator.
elif len(packet) == 11 and packet[0:9] == b'\x05\x00\x08\x00\x00\x05\x14\x13\x91':
# Extract the progress bytes (last 2 bytes)
progress_bytes = struct.unpack('>H', packet[9:11])[0] # Big-endian 16-bit value
# Convert to approximate percentage
progress_percent = int((progress_bytes / 64000.0) * 100)
if progress_percent > 100:
progress_percent = 100
attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {})
current_state = self.get_state(self.entity_id)
# When warming completes (100%), transition to on state
if progress_percent >= 100:
self.log("Warming complete - transitioning to ON")
self.set_state(self.entity_id, state="on", attributes=attrs)
elif current_state != "on":
# Update to warming state
attrs["warming_progress"] = progress_percent
self.set_state(self.entity_id, state=f"Warming Up {progress_percent}%", attributes=attrs)
self.log(f"Warming progress: {progress_percent}% (raw: 0x{progress_bytes:04x})")
def terminate(self):
"""Clean up resources when the app is terminated."""
self.running = False
if self.socket:
try:
self.socket.close()
except:
pass
if self.listener_thread:
self.listener_thread.join(timeout=2)
def handle_service(self, event_name, data, kwargs):
# Handle media player service calls
if data["domain"] == "media_player" and data["service_data"].get("entity_id") == self.entity_id:
service = data["service"]
if service == "turn_on":
self.turn_on()
elif service == "turn_off":
self.turn_off()
elif service == "volume_up":
self.volume_up()
elif service == "volume_down":
self.volume_down()
elif service == "volume_set":
volume_level = data["service_data"].get("volume_level")
if volume_level is not None:
self.volume_set(volume_level)
elif service == "select_source":
source = data["service_data"].get("source")
if source is not None:
self.select_source(source)
# Handle freeze switch service calls
elif data["domain"] == "switch" and data["service_data"].get("entity_id") == "switch.crestron_projector_freeze":
service = data["service"]
if service == "turn_on":
self.send_command("freeze")
# Query status to get confirmation from projector
self.run_in(lambda k: self.send_command("status"), 0.5)
self.run_in(lambda k: self.send_command("status"), 1.0)
elif service == "turn_off":
self.send_command("unfreeze")
# Query status to get confirmation from projector
self.run_in(lambda k: self.send_command("status"), 0.5)
self.run_in(lambda k: self.send_command("status"), 1.0)
# Handle AV mute switch service calls
elif data["domain"] == "switch" and data["service_data"].get("entity_id") == "switch.crestron_projector_av_mute":
service = data["service"]
if service == "turn_on":
self.send_command("av_mute")
# Query status to get confirmation from projector
self.run_in(lambda k: self.send_command("status"), 0.5)
self.run_in(lambda k: self.send_command("status"), 1.0)
elif service == "turn_off":
self.send_command("av_unmute")
# Query status to get confirmation from projector
self.run_in(lambda k: self.send_command("status"), 0.5)
self.run_in(lambda k: self.send_command("status"), 1.0)
# Handle button presses
elif data["domain"] == "button" and data["service"] == "press":
entity_id = data["service_data"].get("entity_id")
if entity_id == "button.crestron_projector_menu":
self.send_command("menu")
self.update_button_state(entity_id)
elif entity_id == "button.crestron_projector_enter":
self.send_command("enter")
self.update_button_state(entity_id)
elif entity_id == "button.crestron_projector_up":
self.send_command("up")
self.update_button_state(entity_id)
elif entity_id == "button.crestron_projector_down":
self.send_command("down")
self.update_button_state(entity_id)
elif entity_id == "button.crestron_projector_left":
self.send_command("left")
self.update_button_state(entity_id)
elif entity_id == "button.crestron_projector_right":
self.send_command("right")
self.update_button_state(entity_id)
def update_button_state(self, entity_id):
"""Update button state with the current timestamp to register a press event."""
import datetime
attrs = self.get_state(entity_id, attribute="all").get("attributes", {})
# Update with ISO timestamp
self.set_state(entity_id, state=datetime.datetime.now().isoformat(), attributes=attrs)
def send_command(self, command):
"""Sends a pre-defined command to the projector."""
try:
# Use the persistent socket if available
if self.socket:
self.socket.sendall(self.commands[command])
self.log(f"Sent command: {command}")
else:
# Fallback to a temporary connection
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.ip, self.port))
s.sendall(self.commands[command])
self.log(f"Sent command: {command} via temporary connection")
except Exception as e:
self.log(f"Error sending command {command}: {e}", level="ERROR")
def turn_on(self, **kwargs):
"""Sends the power_on command."""
current_state = self.get_state(self.entity_id)
# Projector won't accept power on during cooldown
if current_state and current_state.startswith("Cooling Down"):
self.log("Cannot turn on projector while cooling down", level="WARNING")
return
self.send_command("power_on")
def turn_off(self, **kwargs):
"""Sends the power_off command."""
self.send_command("power_off")
def volume_up(self, **kwargs):
"""Increases volume by one step."""
self.send_command("volume_up")
def volume_down(self, **kwargs):
"""Decreases volume by one step."""
self.send_command("volume_down")
def volume_set(self, volume_level):
"""Set volume to a specific level (0.0 - 1.0)."""
target_raw = int(volume_level * 65535)
if self.debug:
self.log(f"Setting volume to {volume_level:.2%} (raw: {target_raw}, 0x{target_raw:04x})")
# Send absolute volume set command: 05:00:08:00:00:05:14:13:93:XX:XX
try:
command = bytes([
0x05, 0x00,
0x08, 0x00,
0x00, 0x05,
0x14, 0x13, 0x93,
(target_raw >> 8) & 0xFF,
target_raw & 0xFF
])
if self.socket:
self.socket.sendall(command)
if self.debug:
self.log(f"Sent absolute volume command: {command.hex(':')}")
else:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.ip, self.port))
s.sendall(command)
except Exception as e:
self.log(f"Error sending absolute volume command: {e}", level="ERROR")
def select_source(self, source):
"""Selects input source."""
if source in self.source_map:
command = self.source_map[source]
self.send_command(command)
if self.debug:
self.log(f"Selected source: {source}")
else:
self.log(f"Unknown source: {source}", level="WARNING")
@BenJamesAndo
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment