Created
November 5, 2025 20:35
-
-
Save osnr/ad87687b5e1364cf43c9fbdd8e8590a5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Projector Serial Controller | |
| Python terminal program to communicate with projector over serial port | |
| Based on decompiled Android app functionality | |
| (AI-translated from decompiled JAR of https://play.google.com/store/apps/details?id=com.anybeam.osd&hl=en_US) | |
| (com.anybeam.osd.UsbService and com.anybeam.osd.MainActivity) | |
| """ | |
| import serial | |
| import time | |
| import threading | |
| from typing import Optional | |
| class ProjectorController: | |
| def __init__(self, port: str = '/dev/ttyACM0', baudrate: int = 115200): | |
| self.port = port | |
| self.baudrate = baudrate | |
| self.serial_conn: Optional[serial.Serial] = None | |
| self.running = False | |
| self.read_thread: Optional[threading.Thread] = None | |
| self.rx_buffer = bytearray() | |
| self.flip_direction = 1 | |
| def connect(self) -> bool: | |
| """Connect to the serial port""" | |
| try: | |
| self.serial_conn = serial.Serial( | |
| port=self.port, | |
| baudrate=self.baudrate, | |
| bytesize=serial.EIGHTBITS, | |
| parity=serial.PARITY_NONE, | |
| stopbits=serial.STOPBITS_ONE, | |
| timeout=1 | |
| ) | |
| self.running = True | |
| self.read_thread = threading.Thread(target=self._read_loop, daemon=True) | |
| self.read_thread.start() | |
| print(f"Connected to {self.port} at {self.baudrate} baud") | |
| return True | |
| except Exception as e: | |
| print(f"Failed to connect: {e}") | |
| return False | |
| def disconnect(self): | |
| """Disconnect from the serial port""" | |
| self.running = False | |
| if self.read_thread: | |
| self.read_thread.join(timeout=2) | |
| if self.serial_conn: | |
| self.serial_conn.close() | |
| self.serial_conn = None | |
| print("Disconnected") | |
| def _read_loop(self): | |
| """Background thread to read incoming data""" | |
| while self.running and self.serial_conn: | |
| try: | |
| if self.serial_conn.in_waiting > 0: | |
| data = self.serial_conn.read(self.serial_conn.in_waiting) | |
| self._process_received_data(data) | |
| time.sleep(0.01) | |
| except Exception as e: | |
| if self.running: | |
| print(f"Read error: {e}") | |
| break | |
| def _process_received_data(self, data: bytes): | |
| """Process received data and extract complete packets""" | |
| self.rx_buffer.extend(data) | |
| print(f"Received: {self._bytes_to_hex(data)}") | |
| # Look for complete packets starting with 0xFF | |
| while len(self.rx_buffer) >= 5: # Minimum packet size | |
| # Find start of packet (0xFF) | |
| start_idx = -1 | |
| for i, byte in enumerate(self.rx_buffer): | |
| if byte == 0xFF: | |
| start_idx = i | |
| break | |
| if start_idx == -1: | |
| # No packet start found, clear buffer | |
| self.rx_buffer.clear() | |
| break | |
| if start_idx > 0: | |
| # Remove data before packet start | |
| self.rx_buffer = self.rx_buffer[start_idx:] | |
| if len(self.rx_buffer) >= 5: | |
| # Extract packet (assuming 5-byte packets based on Android code) | |
| packet = bytes(self.rx_buffer[:5]) | |
| self.rx_buffer = self.rx_buffer[5:] | |
| print(f"Complete packet: {self._bytes_to_hex(packet)}") | |
| else: | |
| break | |
| def _calculate_checksum(self, data: bytes, start: int, length: int) -> int: | |
| """Calculate checksum as sum of bytes""" | |
| checksum = 0 | |
| for i in range(start, start + length): | |
| if i < len(data): | |
| checksum += data[i] | |
| return checksum & 0xFF | |
| def _bytes_to_hex(self, data: bytes) -> str: | |
| """Convert bytes to hex string for display""" | |
| return ' '.join(f'0x{b:02X}' for b in data) | |
| def _send_command(self, data: bytes): | |
| """Send command to projector""" | |
| if not self.serial_conn: | |
| print("Not connected!") | |
| return | |
| try: | |
| print(f"Sending: {self._bytes_to_hex(data)}") | |
| self.serial_conn.write(data) | |
| self.serial_conn.flush() | |
| except Exception as e: | |
| print(f"Send error: {e}") | |
| def cmd_get_setting(self, setting_id: int): | |
| """Get projector setting (based on cmd_get_setting from Android code)""" | |
| # Command: [0xFF, 0x52, setting_id, checksum] | |
| cmd = bytearray([0xFF, 0x52, setting_id, 0x00]) | |
| cmd[3] = self._calculate_checksum(cmd, 0, 3) | |
| self._send_command(bytes(cmd)) | |
| def cmd_osd_button(self, button_code: int): | |
| """Send OSD button command (based on cmd_osd_button from Android code)""" | |
| # Command: [0xFF, 0x57, 0x49, button_code, checksum] | |
| cmd = bytearray([0xFF, 0x57, 0x49, button_code, 0x00]) | |
| cmd[4] = self._calculate_checksum(cmd, 1, 3) | |
| self._send_command(bytes(cmd)) | |
| def cmd_flip(self, direction: int): | |
| """Send flip command (based on cmd_flip from Android code)""" | |
| # Command: [0xFF, 0x57, 0x03, direction, checksum] | |
| cmd = bytearray([0xFF, 0x57, 0x03, direction, 0x00]) | |
| cmd[4] = self._calculate_checksum(cmd, 1, 3) | |
| self._send_command(bytes(cmd)) | |
| def osd_menu(self): | |
| """OSD Menu button (button code 89/0x59)""" | |
| print("OSD Menu") | |
| self.cmd_osd_button(89) | |
| def osd_up(self): | |
| """OSD Up button (button code 122/0x7A)""" | |
| print("OSD Up") | |
| self.cmd_osd_button(122) | |
| def osd_enter(self): | |
| """OSD Enter button (button code 121/0x79)""" | |
| print("OSD Enter") | |
| self.cmd_osd_button(121) | |
| def osd_down(self): | |
| """OSD Down button (button code 120/0x78)""" | |
| print("OSD Down") | |
| self.cmd_osd_button(120) | |
| def flip_image(self): | |
| """Flip image orientation""" | |
| print(f"Flip direction: {self.flip_direction}") | |
| self.cmd_flip(self.flip_direction) | |
| self.flip_direction += 1 | |
| if self.flip_direction == 4: | |
| self.flip_direction = 0 | |
| def print_menu(): | |
| """Print the command menu""" | |
| print("\n" + "="*50) | |
| print("PROJECTOR CONTROLLER") | |
| print("="*50) | |
| print("1. OSD Menu") | |
| print("2. OSD Up") | |
| print("3. OSD Enter") | |
| print("4. OSD Down") | |
| print("5. Flip Image") | |
| print("6. Get Setting (enter ID)") | |
| print("c. Connect") | |
| print("d. Disconnect") | |
| print("q. Quit") | |
| print("="*50) | |
| def main(): | |
| controller = ProjectorController() | |
| print("Projector Serial Controller") | |
| print("Default port: /dev/ttyACM0") | |
| while True: | |
| print_menu() | |
| choice = input("Enter choice: ").strip().lower() | |
| if choice == 'q': | |
| break | |
| elif choice == 'c': | |
| port = input("Enter serial port [/dev/ttyACM0]: ").strip() | |
| if not port: | |
| port = '/dev/ttyACM0' | |
| controller.port = port | |
| controller.connect() | |
| elif choice == 'd': | |
| controller.disconnect() | |
| elif choice == '1': | |
| controller.osd_menu() | |
| elif choice == '2': | |
| controller.osd_up() | |
| elif choice == '3': | |
| controller.osd_enter() | |
| elif choice == '4': | |
| controller.osd_down() | |
| elif choice == '5': | |
| controller.flip_image() | |
| elif choice == '6': | |
| try: | |
| setting_id = int(input("Enter setting ID (0-255): ")) | |
| if 0 <= setting_id <= 255: | |
| controller.cmd_get_setting(setting_id) | |
| else: | |
| print("Setting ID must be 0-255") | |
| except ValueError: | |
| print("Invalid setting ID") | |
| else: | |
| print("Invalid choice") | |
| time.sleep(0.1) | |
| controller.disconnect() | |
| print("Goodbye!") | |
| if __name__ == "__main__": | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment