Created
October 29, 2025 11:34
-
-
Save BenJamesAndo/4da1f7ca96666ad218721d572a7b1b0f to your computer and use it in GitHub Desktop.
Crestron RoomView AppDaemon for Home Assistant
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| crestron_projector: | |
| module: crestron_projector | |
| class: CrestronProjector | |
| ip_address: "192.168.1.139" # Change to your projector's IP address | |
| debug: false # Set to true to enable debug logging |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import appdaemon.plugins.hass.hassapi as hass | |
| import socket | |
| import threading | |
| import struct | |
| import re | |
| class CrestronProjector(hass.Hass): | |
| def initialize(self): | |
| self.ip = self.args["ip_address"] | |
| self.port = 41794 | |
| self.entity_id = "media_player.crestron_projector" | |
| self.debug = self.args.get("debug", False) | |
| self.keepalive_interval = 10 # Keep-alive ping every 10 seconds | |
| self.socket = None | |
| self.listener_thread = None | |
| self.running = False | |
| self.current_volume = None | |
| self.commands = { | |
| "power_on": b"\x05\x00\x06\x00\x00\x03\x00\x04\x00", | |
| "power_off": b"\x05\x00\x06\x00\x00\x03\x00\x05\x00", | |
| "volume_up": b"\x05\x00\x06\x00\x00\x03\x00\xfa\x13", | |
| "volume_down": b"\x05\x00\x06\x00\x00\x03\x00\xfb\x13", | |
| "status": b"\x05\x00\x05\x00\x00\x02\x03\x1e", | |
| "init_lang": b"\x05\x00\x09\x00\x00\x06\x15\x13\xe1\x01\x65\x6e", | |
| "init_cmd": b"\x05\x00\x06\x00\x00\x03\x00\x00\x00", | |
| "query_caps": b"\x05\x00\x05\x00\x00\x02\x03\x1d", | |
| "keepalive_ping": b"\x0d\x00\x02\x00\x00", | |
| # Input sources | |
| "source_computer1": b"\x05\x00\x06\x00\x00\x03\x00\xcd\x13", | |
| "source_computer2": b"\x05\x00\x06\x00\x00\x03\x00\xce\x13", | |
| "source_video": b"\x05\x00\x06\x00\x00\x03\x00\xcf\x13", | |
| "source_svideo": b"\x05\x00\x06\x00\x00\x03\x00\xd0\x13", | |
| "source_hdmi": b"\x05\x00\x06\x00\x00\x03\x00\xd1\x13", | |
| "source_pc_less": b"\x05\x00\x06\x00\x00\x03\x00\xd2\x13", | |
| "source_usb_display": b"\x05\x00\x06\x00\x00\x03\x00\xd3\x13", | |
| "source_lan_display": b"\x05\x00\x06\x00\x00\x03\x00\xd4\x13", | |
| # Menu navigation | |
| "menu": b"\x05\x00\x06\x00\x00\x03\x00\x1d\x14", | |
| "enter": b"\x05\x00\x06\x00\x00\x03\x00\x23\x14", | |
| "up": b"\x05\x00\x06\x00\x00\x03\x00\x1e\x14", | |
| "down": b"\x05\x00\x06\x00\x00\x03\x00\x1f\x14", | |
| "left": b"\x05\x00\x06\x00\x00\x03\x00\x20\x14", | |
| "right": b"\x05\x00\x06\x00\x00\x03\x00\x21\x14", | |
| # Freeze | |
| "freeze": b"\x05\x00\x06\x00\x00\x03\x00\xf0\x13", | |
| "unfreeze": b"\x05\x00\x06\x00\x00\x03\x00\xf1\x13", | |
| # AV Mute | |
| "av_mute": b"\x05\x00\x06\x00\x00\x03\x00\xfc\x13", | |
| "av_unmute": b"\x05\x00\x06\x00\x00\x03\x00\xfd\x13", | |
| } | |
| # All possible source names and their command mappings (filtered by capabilities after connection) | |
| self.all_source_names = ["Computer1", "Computer2", "Video", "S-Video", "HDMI", "PC Less Presentation", "USB Display", "LAN Display"] | |
| self.all_source_map = { | |
| "Computer1": "source_computer1", | |
| "Computer2": "source_computer2", | |
| "Video": "source_video", | |
| "S-Video": "source_svideo", | |
| "HDMI": "source_hdmi", | |
| "PC Less Presentation": "source_pc_less", | |
| "USB Display": "source_usb_display", | |
| "LAN Display": "source_lan_display", | |
| } | |
| # Map source byte codes to friendly names for status parsing | |
| self.source_code_map = { | |
| 0xcd: "Computer1", | |
| 0xce: "Computer2", | |
| 0xcf: "Video", | |
| 0xd0: "S-Video", | |
| 0xd1: "HDMI", | |
| 0xd2: "PC Less Presentation", | |
| 0xd3: "USB Display", | |
| 0xd4: "LAN Display", | |
| } | |
| # Fallback sources if capabilities are not detected | |
| fallback_sources = ["HDMI", "S-Video", "Computer1", "Video"] | |
| self.source_list = fallback_sources | |
| self.source_map = {name: self.all_source_map[name] for name in fallback_sources} | |
| self.capabilities_detected = False | |
| self.detected_capabilities = [] | |
| self.capabilities_detected = False | |
| # SUPPORT_TURN_ON (128) | SUPPORT_TURN_OFF (256) | SUPPORT_VOLUME_STEP (1024) | SUPPORT_VOLUME_SET (4) | SUPPORT_SELECT_SOURCE (2048) = 3460 | |
| initial_attrs = { | |
| "supported_features": 3460, | |
| "friendly_name": "Crestron Projector", | |
| "source_list": self.source_list, | |
| "device_class": "tv", | |
| "icon": "mdi:projector", | |
| "device_info": { | |
| "identifiers": [["crestron_projector", self.ip]], | |
| "name": "Crestron Projector", | |
| "manufacturer": "Crestron", | |
| "model": "Room View", | |
| "sw_version": "1.0" | |
| } | |
| } | |
| self.set_state(self.entity_id, state="unavailable", attributes=initial_attrs) | |
| self.set_state("switch.crestron_projector_freeze", state="off", attributes={ | |
| "friendly_name": "Projector Freeze", | |
| "icon": "mdi:pause-circle" | |
| }) | |
| self.set_state("switch.crestron_projector_av_mute", state="off", attributes={ | |
| "friendly_name": "Projector AV Mute", | |
| "icon": "mdi:audio-video-off" | |
| }) | |
| self.create_button_entities() | |
| self.listen_event(self.handle_service, "call_service") | |
| self.run_in(self.start_connection, 0) | |
| def create_button_entities(self): | |
| """Create button entities for menu navigation commands.""" | |
| import datetime | |
| buttons = { | |
| "button.crestron_projector_menu": {"name": "Menu", "icon": "mdi:menu"}, | |
| "button.crestron_projector_enter": {"name": "Enter", "icon": "mdi:keyboard-return"}, | |
| "button.crestron_projector_up": {"name": "Up", "icon": "mdi:chevron-up"}, | |
| "button.crestron_projector_down": {"name": "Down", "icon": "mdi:chevron-down"}, | |
| "button.crestron_projector_left": {"name": "Left", "icon": "mdi:chevron-left"}, | |
| "button.crestron_projector_right": {"name": "Right", "icon": "mdi:chevron-right"}, | |
| } | |
| for entity_id, config in buttons.items(): | |
| attrs = { | |
| "friendly_name": f"Projector {config['name']}", | |
| "icon": config["icon"], | |
| "device_class": "button" | |
| } | |
| self.set_state(entity_id, state=datetime.datetime.now().isoformat(), attributes=attrs) | |
| def start_connection(self, kwargs): | |
| """Initiate the persistent connection thread to receive real-time updates.""" | |
| try: | |
| self.running = True | |
| self.listener_thread = threading.Thread(target=self.listen_for_updates, daemon=True) | |
| self.listener_thread.start() | |
| self.log("Started persistent connection to projector") | |
| except Exception as e: | |
| self.log(f"Error starting connection: {e}", level="ERROR") | |
| def finalize_capabilities(self, kwargs): | |
| """Update the source list with detected capabilities once all capability packets are received.""" | |
| if self.detected_capabilities and not self.capabilities_detected: | |
| self.source_list = self.detected_capabilities | |
| self.source_map = {name: self.all_source_map[name] for name in self.detected_capabilities} | |
| self.capabilities_detected = True | |
| current_state = self.get_state(self.entity_id) | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| attrs["source_list"] = self.source_list | |
| self.set_state(self.entity_id, state=current_state, attributes=attrs) | |
| self.log(f"Capabilities finalized: {len(self.detected_capabilities)} sources detected: {', '.join(self.detected_capabilities)}") | |
| elif self.debug: | |
| self.log(f"No capabilities detected, using fallback sources") | |
| def listen_for_updates(self): | |
| """Listen for real-time status updates from the projector over the persistent socket.""" | |
| while self.running: | |
| try: | |
| self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.socket.settimeout(10) | |
| self.socket.connect((self.ip, self.port)) | |
| self.log("Connected to projector, performing handshake...") | |
| # Send initialization commands | |
| if self.debug: | |
| self.log("Sending status query...") | |
| self.socket.sendall(self.commands["status"]) | |
| if self.debug: | |
| self.log("Sending language init...") | |
| self.socket.sendall(self.commands["init_lang"]) | |
| if self.debug: | |
| self.log("Sending init command...") | |
| self.socket.sendall(self.commands["init_cmd"]) | |
| if self.debug: | |
| self.log("Sending capability query...") | |
| self.socket.sendall(self.commands["query_caps"]) | |
| self.log("Handshake complete, listening for updates...") | |
| self.socket.settimeout(1.0) | |
| last_keepalive = self.get_now_ts() | |
| last_data_received = self.get_now_ts() | |
| keepalive_failures = 0 | |
| buffer = b'' | |
| while self.running: | |
| try: | |
| data = self.socket.recv(4096) | |
| if not data: | |
| self.log("Connection closed by projector") | |
| break | |
| keepalive_failures = 0 | |
| last_data_received = self.get_now_ts() | |
| if self.debug: | |
| self.log(f"Received {len(data)} bytes: {data[:50].hex(':')}") | |
| buffer += data | |
| while len(buffer) >= 2: | |
| packet_type = buffer[0] | |
| if self.debug and len(buffer) < 100: | |
| self.log(f"Buffer processing: type={packet_type:02x}, buffer_len={len(buffer)}, start={buffer[:20].hex(':')}") | |
| elif self.debug: | |
| self.log(f"Buffer processing: type={packet_type:02x}, buffer_len={len(buffer)}") | |
| # Handle handshake packets (0x01, 0x02, 0x0f) and skip them | |
| if packet_type in [0x01, 0x02, 0x0f]: | |
| if packet_type == 0x0f and len(buffer) >= 4: | |
| if self.debug: | |
| self.log(f"Skipping handshake packet 0f") | |
| buffer = buffer[4:] | |
| continue | |
| elif packet_type == 0x01 and len(buffer) >= 10: | |
| if self.debug: | |
| self.log(f"Skipping handshake packet 01") | |
| buffer = buffer[10:] | |
| continue | |
| elif packet_type == 0x02 and len(buffer) >= 7: | |
| if self.debug: | |
| self.log(f"Skipping handshake packet 02") | |
| buffer = buffer[7:] | |
| continue | |
| else: | |
| if self.debug: | |
| self.log(f"Waiting for more handshake data") | |
| break | |
| # Handle keep-alive response (0x0e) and skip it | |
| if packet_type == 0x0e and len(buffer) >= 5: | |
| if buffer[0:5] == b'\x0e\x00\x02\x00\x00': | |
| if self.debug: | |
| self.log("Received keep-alive pong") | |
| buffer = buffer[5:] | |
| continue | |
| if packet_type == 0x05: | |
| if len(buffer) < 2 or buffer[1] != 0x00: | |
| if self.debug: | |
| self.log(f"Invalid Crestron header, expected 05:00, got {buffer[:2].hex(':')}") | |
| buffer = buffer[1:] | |
| continue | |
| if len(buffer) < 5: | |
| if self.debug: | |
| self.log(f"Crestron packet too short (< 5 bytes), waiting...") | |
| break | |
| # Length is in little-endian format, represents total_packet_length - 3 | |
| data_len = struct.unpack('<H', buffer[2:4])[0] | |
| packet_len = data_len + 3 | |
| if self.debug: | |
| self.log(f"Crestron packet: header={buffer[:min(10,len(buffer))].hex(':')}, data_len={data_len}, packet_len={packet_len}, buffer={len(buffer)}") | |
| if len(buffer) < packet_len: | |
| if self.debug: | |
| self.log(f"Waiting for complete packet (need {packet_len}, have {len(buffer)})") | |
| break | |
| packet = buffer[:packet_len] | |
| buffer = buffer[packet_len:] | |
| if self.debug: | |
| self.log(f"Processing Crestron packet: {len(packet)} bytes, header: {packet[:10].hex(':')}") | |
| self.process_packet(packet) | |
| else: | |
| if self.debug: | |
| self.log(f"Skipping invalid data at start of buffer: {buffer[0]:02x}") | |
| buffer = buffer[1:] | |
| except socket.timeout: | |
| now = self.get_now_ts() | |
| if now - last_keepalive >= self.keepalive_interval: | |
| if self.debug: | |
| self.log("Sending keep-alive ping.") | |
| try: | |
| self.socket.sendall(self.commands["keepalive_ping"]) | |
| last_keepalive = now | |
| except Exception as e: | |
| self.log(f"Failed to send keepalive: {e}", level="WARNING") | |
| break | |
| if now - last_data_received > 30: | |
| current_state = self.get_state(self.entity_id) | |
| if current_state != "unavailable": | |
| self.log("No data received for 30 seconds, marking projector as unavailable", level="WARNING") | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| self.set_state(self.entity_id, state="unavailable", attributes=attrs) | |
| if now - last_data_received > 45: | |
| self.log("No data received for 45 seconds, connection appears dead. Reconnecting...", level="WARNING") | |
| break | |
| continue | |
| except Exception as e: | |
| if self.running: | |
| self.log(f"Connection error: {e}, reconnecting in 10s...", level="WARNING") | |
| self.run_in(lambda kwargs: self.listen_for_updates(), 10) | |
| return | |
| finally: | |
| if self.socket: | |
| try: | |
| self.socket.close() | |
| except: | |
| pass | |
| self.socket = None | |
| self.log("Listener thread stopped.") | |
| def process_packet(self, packet): | |
| """Process an incoming packet from the projector, updating entity states.""" | |
| current_state = self.get_state(self.entity_id) | |
| if current_state == "unavailable": | |
| self.log("Projector connection restored - setting to off and querying actual status") | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| self.set_state(self.entity_id, state="off", attributes=attrs) | |
| self.run_in(lambda k: self.send_command("status"), 1.0) | |
| if self.debug and len(packet) < 100: | |
| hex_dump = packet.hex(':') | |
| ascii_dump = packet.decode(errors="ignore").replace('\x00', '·').replace('\x03', '│').replace('\x05', '┘') | |
| self.log(f"Packet [{len(packet)}B]: {hex_dump}") | |
| self.log(f" ASCII: {ascii_dump}") | |
| elif self.debug: | |
| self.log(f"Received status packet: {len(packet)} bytes") | |
| # Check for Freeze status: Pattern: 05:00:06:00:00:03:00:XX:93 | |
| if len(packet) == 9 and packet[0:7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[8] == 0x93: | |
| if packet[7] == 0xf1: | |
| self.set_state("switch.crestron_projector_freeze", state="on", attributes={ | |
| "friendly_name": "Projector Freeze", | |
| "icon": "mdi:pause-circle" | |
| }) | |
| if self.debug: | |
| self.log("Freeze status: ON") | |
| elif packet[7] == 0xf0: | |
| self.set_state("switch.crestron_projector_freeze", state="off", attributes={ | |
| "friendly_name": "Projector Freeze", | |
| "icon": "mdi:pause-circle" | |
| }) | |
| if self.debug: | |
| self.log("Freeze status: OFF") | |
| # Check for AV mute status: Pattern: 05:00:06:00:00:03:00:XX:93 (Mute ON: 0xfd, Mute OFF: 0xfc) | |
| if len(packet) == 9 and packet[0:7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[8] == 0x93: | |
| av_mute_code = packet[7] | |
| if av_mute_code in [0xfc, 0xfd]: # Only process AV mute codes | |
| if av_mute_code == 0xfd: | |
| # AV mute on | |
| self.set_state("switch.crestron_projector_av_mute", state="on", attributes={ | |
| "friendly_name": "Projector AV Mute", | |
| "icon": "mdi:audio-video-off" | |
| }) | |
| if self.debug: | |
| self.log("AV Mute status: ON") | |
| elif av_mute_code == 0xfc: | |
| # AV mute off | |
| self.set_state("switch.crestron_projector_av_mute", state="off", attributes={ | |
| "friendly_name": "Projector AV Mute", | |
| "icon": "mdi:audio-video-off" | |
| }) | |
| if self.debug: | |
| self.log("AV Mute status: OFF") | |
| # Check source status: 05:00:06:00:00:03:00:XX:13 (active source ends with :13) | |
| # Pattern: 05:00:06:00:00:03:00:cd:13 (Computer1), 05:00:06:00:00:03:00:d1:13 (HDMI), etc. | |
| if len(packet) == 9 and packet[0:7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[8] == 0x13: | |
| source_code = packet[7] | |
| if source_code in self.source_code_map: | |
| source_name = self.source_code_map[source_code] | |
| current_state = self.get_state(self.entity_id) | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| attrs["source"] = source_name | |
| self.set_state(self.entity_id, state=current_state, attributes=attrs) | |
| if self.debug: | |
| self.log(f"Source status: {source_name} (0x{source_code:02x})") | |
| # Parse volume, freeze, AV mute, and source status from large status packets | |
| if len(packet) > 100: | |
| # Search for volume status: Pattern: 05:00:08:00:00:05:14:13:93:XX:XX | |
| pos = 0 | |
| while pos < len(packet) - 11: | |
| if packet[pos:pos+9] == b'\x05\x00\x08\x00\x00\x05\x14\x13\x93': | |
| volume_bytes = packet[pos+9:pos+11] | |
| volume_raw = struct.unpack('>H', volume_bytes)[0] | |
| # Convert raw value to percentage and round to nearest 5% if within 2% | |
| volume_level_actual = volume_raw / 65535.0 | |
| volume_percent = volume_level_actual * 100 | |
| nearest_5 = round(volume_percent / 5) * 5 | |
| # Only round if within 2% of a 5% increment | |
| if abs(volume_percent - nearest_5) <= 2: | |
| volume_level_display = nearest_5 / 100 | |
| else: | |
| volume_level_display = volume_level_actual | |
| self.log(f"Volume update: raw={volume_raw} (0x{volume_bytes.hex()}) = {volume_level_display:.0%}") | |
| self.current_volume = volume_raw | |
| # Update entity with volume | |
| current_state = self.get_state(self.entity_id) | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| attrs["volume_raw"] = volume_raw | |
| attrs["volume_level"] = volume_level_display | |
| self.set_state(self.entity_id, state=current_state, attributes=attrs) | |
| break | |
| pos += 1 | |
| # Search for freeze status: Pattern: 05:00:06:00:00:03:00:f1:93 (ON) or 05:00:06:00:00:03:00:f0:93 (OFF) | |
| pos = 0 | |
| while pos < len(packet) - 9: | |
| if packet[pos:pos+7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[pos+8] == 0x93: | |
| if packet[pos+7] == 0xf1: | |
| self.set_state("switch.crestron_projector_freeze", state="on", attributes={ | |
| "friendly_name": "Projector Freeze", | |
| "icon": "mdi:pause-circle" | |
| }) | |
| if self.debug: | |
| self.log("Freeze status (status packet): ON") | |
| break | |
| elif packet[pos+7] == 0xf0: | |
| self.set_state("switch.crestron_projector_freeze", state="off", attributes={ | |
| "friendly_name": "Projector Freeze", | |
| "icon": "mdi:pause-circle" | |
| }) | |
| if self.debug: | |
| self.log("Freeze status (status packet): OFF") | |
| break | |
| pos += 1 | |
| # Search for AV mute status: Pattern: 05:00:06:00:00:03:00:fc:93 (ON) or 05:00:06:00:00:03:00:fd:93 (OFF) | |
| pos = 0 | |
| while pos < len(packet) - 9: | |
| if packet[pos:pos+7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[pos+8] == 0x93: | |
| av_mute_code = packet[pos+7] | |
| if av_mute_code == 0xfd: | |
| self.set_state("switch.crestron_projector_av_mute", state="on", attributes={ | |
| "friendly_name": "Projector AV Mute", | |
| "icon": "mdi:audio-video-off" | |
| }) | |
| if self.debug: | |
| self.log("AV Mute status (status packet): ON") | |
| break | |
| elif av_mute_code == 0xfc: | |
| self.set_state("switch.crestron_projector_av_mute", state="off", attributes={ | |
| "friendly_name": "Projector AV Mute", | |
| "icon": "mdi:audio-video-off" | |
| }) | |
| if self.debug: | |
| self.log("AV Mute status (status packet): OFF") | |
| break | |
| pos += 1 | |
| # Search for active source status: Pattern: 05:00:06:00:00:03:00:XX:13 | |
| pos = 0 | |
| while pos < len(packet) - 9: | |
| if packet[pos:pos+7] == b'\x05\x00\x06\x00\x00\x03\x00' and packet[pos+8] == 0x13: | |
| source_code = packet[pos+7] | |
| if source_code in self.source_code_map: | |
| source_name = self.source_code_map[source_code] | |
| current_state = self.get_state(self.entity_id) | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| attrs["source"] = source_name | |
| self.set_state(self.entity_id, state=current_state, attributes=attrs) | |
| if self.debug: | |
| self.log(f"Source status (status packet): {source_name} (0x{source_code:02x})") | |
| break | |
| pos += 1 | |
| # Check for standalone volume status: Pattern: 05:00:08:00:00:05:14:13:93:XX:XX | |
| if len(packet) == 11 and packet[0:9] == b'\x05\x00\x08\x00\x00\x05\x14\x13\x93': | |
| volume_bytes = packet[9:11] | |
| volume_raw = struct.unpack('>H', volume_bytes)[0] | |
| # Convert raw value to percentage and round to nearest 5% if within 2% | |
| volume_level_actual = volume_raw / 65535.0 | |
| volume_percent = volume_level_actual * 100 | |
| nearest_5 = round(volume_percent / 5) * 5 | |
| # Only round if within 2% of a 5% increment | |
| if abs(volume_percent - nearest_5) <= 2: | |
| volume_level_display = nearest_5 / 100 | |
| else: | |
| volume_level_display = volume_level_actual | |
| self.log(f"Volume update: raw={volume_raw} (0x{volume_bytes.hex()}) = {volume_level_display:.0%}") | |
| self.current_volume = volume_raw | |
| # Update entity with volume | |
| current_state = self.get_state(self.entity_id) | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| attrs["volume_raw"] = volume_raw | |
| attrs["volume_level"] = volume_level_display | |
| self.set_state(self.entity_id, state=current_state, attributes=attrs) | |
| return | |
| # Power status and text data parsing | |
| response = packet.decode(errors="ignore") | |
| # Parse model information from status response | |
| # Pattern: MANUFACTURER:Projector:MODEL:VERSION | |
| manufacturer_match = re.search(r'([A-Z]+):Projector:([^:\x00]+)', response) | |
| if manufacturer_match: | |
| try: | |
| manufacturer = manufacturer_match.group(1).title() # e.g., MITSUBISHI -> Mitsubishi | |
| model = manufacturer_match.group(2).strip() | |
| current_state = self.get_state(self.entity_id) | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| # Update device_info with detected manufacturer and model | |
| attrs["device_info"] = { | |
| "identifiers": [["crestron_projector", self.ip]], | |
| "name": "Crestron Projector", | |
| "manufacturer": manufacturer, | |
| "model": model, | |
| "sw_version": "1.0" | |
| } | |
| self.set_state(self.entity_id, state=current_state, attributes=attrs) | |
| if self.debug: | |
| self.log(f"Device detected: {manufacturer} {model}") | |
| except Exception as e: | |
| if self.debug: | |
| self.log(f"Error parsing model: {e}", level="WARNING") | |
| # Parse lamp hours from status response | |
| # Pattern: "1234 Hours" | |
| lamp_match = re.search(r'(\d+)\s*Hours', response) | |
| if lamp_match: | |
| try: | |
| lamp_hours = int(lamp_match.group(1)) | |
| current_state = self.get_state(self.entity_id) | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| attrs["lamp_hours"] = lamp_hours | |
| self.set_state(self.entity_id, state=current_state, attributes=attrs) | |
| if self.debug: | |
| self.log(f"Lamp hours: {lamp_hours}") | |
| except Exception as e: | |
| if self.debug: | |
| self.log(f"Error parsing lamp hours: {e}", level="WARNING") | |
| # Parse available sources from capabilities response (only run once) | |
| if not self.capabilities_detected: | |
| if self.debug: | |
| self.log(f"Checking for source capability in packet (length={len(packet)})") | |
| # Check for capability packet: 05:00:06:00:00:03:00:XX:YY where XX is the capability code | |
| if len(packet) >= 8 and packet[4] == 0x00 and packet[5] == 0x03 and packet[6] == 0x00: | |
| capability_code = packet[7] | |
| # Check if this is a recognized source code | |
| if capability_code in self.source_code_map: | |
| source_name = self.source_code_map[capability_code] | |
| if source_name not in self.detected_capabilities: | |
| self.detected_capabilities.append(source_name) | |
| if self.debug: | |
| self.log(f"Detected source capability: {source_name} (code: {capability_code:02x})") | |
| # Set a timer to finalize capabilities after receiving several packets | |
| if not hasattr(self, 'capability_timer_set'): | |
| self.capability_timer_set = True | |
| self.run_in(self.finalize_capabilities, 2) | |
| # Parse general power status from text response | |
| if "Power On" in response: | |
| if current_state not in ["on", "warming_up"]: | |
| self.log("Projector power: ON") | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| self.set_state(self.entity_id, state="on", attributes=attrs) | |
| elif "Cooling Down" in response: | |
| if not current_state or not current_state.startswith("Cooling Down"): | |
| self.log("Projector power: COOLING DOWN") | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| self.set_state(self.entity_id, state="Cooling Down", attributes=attrs) | |
| elif "Warming Up" in response: | |
| if not current_state or not current_state.startswith("Warming Up"): | |
| self.log("Projector power: WARMING UP") | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| self.set_state(self.entity_id, state="Warming Up", attributes=attrs) | |
| elif "Power Off" in response: | |
| if current_state != "off": | |
| self.log("Projector power: OFF") | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| self.set_state(self.entity_id, state="off", attributes=attrs) | |
| # Parse cooling progress from packet: 05:00:08:00:00:05:14:13:92:XX:XX | |
| # The last two bytes count DOWN from ~65000 to 0. This is the most accurate indicator. | |
| if len(packet) == 11 and packet[0:9] == b'\x05\x00\x08\x00\x00\x05\x14\x13\x92': | |
| # Extract the progress bytes (last 2 bytes) | |
| progress_bytes = struct.unpack('>H', packet[9:11])[0] # Big-endian 16-bit value | |
| # Convert to approximate percentage | |
| progress_percent = int((progress_bytes / 65000.0) * 100) | |
| if progress_percent > 100: | |
| progress_percent = 100 | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| # When cooling completes (very low percentage), transition to off state | |
| if progress_percent <= 2: | |
| self.log("Cooling complete - transitioning to OFF") | |
| self.set_state(self.entity_id, state="off", attributes=attrs) | |
| else: | |
| # Update to cooling state | |
| attrs["cooling_progress"] = progress_percent | |
| self.set_state(self.entity_id, state=f"Cooling Down {progress_percent}%", attributes=attrs) | |
| self.log(f"Cooling progress: {progress_percent}% (raw: 0x{progress_bytes:04x})") | |
| # Parse warming progress from packet: 05:00:08:00:00:05:14:13:91:XX:XX | |
| # The last two bytes count UP from 0 to ~64000. This is the most accurate indicator. | |
| elif len(packet) == 11 and packet[0:9] == b'\x05\x00\x08\x00\x00\x05\x14\x13\x91': | |
| # Extract the progress bytes (last 2 bytes) | |
| progress_bytes = struct.unpack('>H', packet[9:11])[0] # Big-endian 16-bit value | |
| # Convert to approximate percentage | |
| progress_percent = int((progress_bytes / 64000.0) * 100) | |
| if progress_percent > 100: | |
| progress_percent = 100 | |
| attrs = self.get_state(self.entity_id, attribute="all").get("attributes", {}) | |
| current_state = self.get_state(self.entity_id) | |
| # When warming completes (100%), transition to on state | |
| if progress_percent >= 100: | |
| self.log("Warming complete - transitioning to ON") | |
| self.set_state(self.entity_id, state="on", attributes=attrs) | |
| elif current_state != "on": | |
| # Update to warming state | |
| attrs["warming_progress"] = progress_percent | |
| self.set_state(self.entity_id, state=f"Warming Up {progress_percent}%", attributes=attrs) | |
| self.log(f"Warming progress: {progress_percent}% (raw: 0x{progress_bytes:04x})") | |
| def terminate(self): | |
| """Clean up resources when the app is terminated.""" | |
| self.running = False | |
| if self.socket: | |
| try: | |
| self.socket.close() | |
| except: | |
| pass | |
| if self.listener_thread: | |
| self.listener_thread.join(timeout=2) | |
| def handle_service(self, event_name, data, kwargs): | |
| # Handle media player service calls | |
| if data["domain"] == "media_player" and data["service_data"].get("entity_id") == self.entity_id: | |
| service = data["service"] | |
| if service == "turn_on": | |
| self.turn_on() | |
| elif service == "turn_off": | |
| self.turn_off() | |
| elif service == "volume_up": | |
| self.volume_up() | |
| elif service == "volume_down": | |
| self.volume_down() | |
| elif service == "volume_set": | |
| volume_level = data["service_data"].get("volume_level") | |
| if volume_level is not None: | |
| self.volume_set(volume_level) | |
| elif service == "select_source": | |
| source = data["service_data"].get("source") | |
| if source is not None: | |
| self.select_source(source) | |
| # Handle freeze switch service calls | |
| elif data["domain"] == "switch" and data["service_data"].get("entity_id") == "switch.crestron_projector_freeze": | |
| service = data["service"] | |
| if service == "turn_on": | |
| self.send_command("freeze") | |
| # Query status to get confirmation from projector | |
| self.run_in(lambda k: self.send_command("status"), 0.5) | |
| self.run_in(lambda k: self.send_command("status"), 1.0) | |
| elif service == "turn_off": | |
| self.send_command("unfreeze") | |
| # Query status to get confirmation from projector | |
| self.run_in(lambda k: self.send_command("status"), 0.5) | |
| self.run_in(lambda k: self.send_command("status"), 1.0) | |
| # Handle AV mute switch service calls | |
| elif data["domain"] == "switch" and data["service_data"].get("entity_id") == "switch.crestron_projector_av_mute": | |
| service = data["service"] | |
| if service == "turn_on": | |
| self.send_command("av_mute") | |
| # Query status to get confirmation from projector | |
| self.run_in(lambda k: self.send_command("status"), 0.5) | |
| self.run_in(lambda k: self.send_command("status"), 1.0) | |
| elif service == "turn_off": | |
| self.send_command("av_unmute") | |
| # Query status to get confirmation from projector | |
| self.run_in(lambda k: self.send_command("status"), 0.5) | |
| self.run_in(lambda k: self.send_command("status"), 1.0) | |
| # Handle button presses | |
| elif data["domain"] == "button" and data["service"] == "press": | |
| entity_id = data["service_data"].get("entity_id") | |
| if entity_id == "button.crestron_projector_menu": | |
| self.send_command("menu") | |
| self.update_button_state(entity_id) | |
| elif entity_id == "button.crestron_projector_enter": | |
| self.send_command("enter") | |
| self.update_button_state(entity_id) | |
| elif entity_id == "button.crestron_projector_up": | |
| self.send_command("up") | |
| self.update_button_state(entity_id) | |
| elif entity_id == "button.crestron_projector_down": | |
| self.send_command("down") | |
| self.update_button_state(entity_id) | |
| elif entity_id == "button.crestron_projector_left": | |
| self.send_command("left") | |
| self.update_button_state(entity_id) | |
| elif entity_id == "button.crestron_projector_right": | |
| self.send_command("right") | |
| self.update_button_state(entity_id) | |
| def update_button_state(self, entity_id): | |
| """Update button state with the current timestamp to register a press event.""" | |
| import datetime | |
| attrs = self.get_state(entity_id, attribute="all").get("attributes", {}) | |
| # Update with ISO timestamp | |
| self.set_state(entity_id, state=datetime.datetime.now().isoformat(), attributes=attrs) | |
| def send_command(self, command): | |
| """Sends a pre-defined command to the projector.""" | |
| try: | |
| # Use the persistent socket if available | |
| if self.socket: | |
| self.socket.sendall(self.commands[command]) | |
| self.log(f"Sent command: {command}") | |
| else: | |
| # Fallback to a temporary connection | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| s.connect((self.ip, self.port)) | |
| s.sendall(self.commands[command]) | |
| self.log(f"Sent command: {command} via temporary connection") | |
| except Exception as e: | |
| self.log(f"Error sending command {command}: {e}", level="ERROR") | |
| def turn_on(self, **kwargs): | |
| """Sends the power_on command.""" | |
| current_state = self.get_state(self.entity_id) | |
| # Projector won't accept power on during cooldown | |
| if current_state and current_state.startswith("Cooling Down"): | |
| self.log("Cannot turn on projector while cooling down", level="WARNING") | |
| return | |
| self.send_command("power_on") | |
| def turn_off(self, **kwargs): | |
| """Sends the power_off command.""" | |
| self.send_command("power_off") | |
| def volume_up(self, **kwargs): | |
| """Increases volume by one step.""" | |
| self.send_command("volume_up") | |
| def volume_down(self, **kwargs): | |
| """Decreases volume by one step.""" | |
| self.send_command("volume_down") | |
| def volume_set(self, volume_level): | |
| """Set volume to a specific level (0.0 - 1.0).""" | |
| target_raw = int(volume_level * 65535) | |
| if self.debug: | |
| self.log(f"Setting volume to {volume_level:.2%} (raw: {target_raw}, 0x{target_raw:04x})") | |
| # Send absolute volume set command: 05:00:08:00:00:05:14:13:93:XX:XX | |
| try: | |
| command = bytes([ | |
| 0x05, 0x00, | |
| 0x08, 0x00, | |
| 0x00, 0x05, | |
| 0x14, 0x13, 0x93, | |
| (target_raw >> 8) & 0xFF, | |
| target_raw & 0xFF | |
| ]) | |
| if self.socket: | |
| self.socket.sendall(command) | |
| if self.debug: | |
| self.log(f"Sent absolute volume command: {command.hex(':')}") | |
| else: | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| s.connect((self.ip, self.port)) | |
| s.sendall(command) | |
| except Exception as e: | |
| self.log(f"Error sending absolute volume command: {e}", level="ERROR") | |
| def select_source(self, source): | |
| """Selects input source.""" | |
| if source in self.source_map: | |
| command = self.source_map[source] | |
| self.send_command(command) | |
| if self.debug: | |
| self.log(f"Selected source: {source}") | |
| else: | |
| self.log(f"Unknown source: {source}", level="WARNING") |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Guide available at https://community.home-assistant.io/t/crestron-room-view-projector-control/526151/10