Created
December 8, 2025 20:40
-
-
Save assimilat/62c5bd8dbc257844c2bfeb068398ad28 to your computer and use it in GitHub Desktop.
5131:2007 8 channel relay board control
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import os | |
| import json | |
| import time | |
| # Try to import hidapi | |
| try: | |
| import hid | |
| except ImportError: | |
| print("Error: 'hidapi' library is missing.") | |
| print("Please run: pip3 install hidapi") | |
| sys.exit(1) | |
| # ============================================================================== | |
| # USB HID Relay Control Script (CH554T / LCUS-over-HID Version) | |
| # Device: 5131:2007 (CH554T Chipset) | |
| # Protocol: LCUS Packets [0xA0, Channel, State, Checksum] wrapped in HID | |
| # ============================================================================== | |
| VID = 0x5131 | |
| PID = 0x2007 | |
| STATE_FILE = "/var/tmp/usb_relay_state.json" | |
| def load_state(): | |
| """Load the current state of all 8 relays from JSON.""" | |
| if os.path.exists(STATE_FILE): | |
| try: | |
| with open(STATE_FILE, 'r') as f: | |
| return json.load(f) | |
| except: | |
| pass | |
| # Default: All OFF | |
| return {str(i): False for i in range(1, 9)} | |
| def save_state(state): | |
| """Save the state to JSON.""" | |
| with open(STATE_FILE, 'w') as f: | |
| json.dump(state, f) | |
| def get_device(): | |
| """Find and open the relay device.""" | |
| try: | |
| # Search specifically for our VID/PID | |
| interfaces = hid.enumerate(VID, PID) | |
| if not interfaces: | |
| print(f"Error: Device {hex(VID)}:{hex(PID)} not found.") | |
| return None | |
| # Sort to ensure stability | |
| interfaces.sort(key=lambda x: x['path']) | |
| path = interfaces[0]['path'] | |
| # print(f"Connecting to: {path.decode('utf-8', 'ignore')}") | |
| h = hid.device() | |
| h.open_path(path) | |
| return h | |
| except Exception as e: | |
| print(f"Connection Error: {e}") | |
| return None | |
| def send_lcus_command(device, channel, turn_on): | |
| """ | |
| Sends command using LCUS protocol (Hex codes usually used over Serial). | |
| Structure: [A0, Channel, State, Checksum] | |
| """ | |
| cmd_byte = 0xA0 | |
| state_byte = 0x01 if turn_on else 0x00 | |
| # Checksum = Sum of first 3 bytes (masked to 8 bits) | |
| checksum = (cmd_byte + channel + state_byte) & 0xFF | |
| # Packet Construction | |
| # Byte 0: Report ID (0x00) - Required by hidapi for standard write | |
| # Byte 1: Start Byte (0xA0) | |
| # Byte 2: Channel | |
| # Byte 3: State | |
| # Byte 4: Checksum | |
| # Padding: Zeros to fill packet (usually 8 or 64 bytes, sending 8 is safe) | |
| packet = [0x00, cmd_byte, channel, state_byte, checksum, 0x00, 0x00, 0x00, 0x00] | |
| try: | |
| # Method 1: Standard Output Report (Write) | |
| # This is the most likely method for CH554T chips | |
| device.write(packet) | |
| return True | |
| except Exception as e: | |
| print(f"Write failed: {e}") | |
| # Method 2: Feature Report (Fallback) | |
| try: | |
| print("Attempting Feature Report fallback...") | |
| device.send_feature_report(packet) | |
| return True | |
| except Exception as e2: | |
| print(f"Feature Report failed: {e2}") | |
| return False | |
| def main(): | |
| if len(sys.argv) < 3: | |
| print("Usage: sudo ./relay.py <channel> <on|off|toggle>") | |
| print(" sudo ./relay.py all <on|off>") | |
| sys.exit(1) | |
| channel_arg = sys.argv[1].lower() | |
| action_arg = sys.argv[2].lower() | |
| # 1. Load current state | |
| state = load_state() | |
| # 2. Update State in Memory | |
| target_channels = [] | |
| if channel_arg == "all": | |
| target_channels = range(1, 9) | |
| else: | |
| try: | |
| ch = int(channel_arg) | |
| if not (1 <= ch <= 8): | |
| raise ValueError | |
| target_channels = [ch] | |
| except: | |
| print("Error: Channel must be 1-8") | |
| sys.exit(1) | |
| # 3. Connect to Device | |
| device = get_device() | |
| if not device: | |
| sys.exit(1) | |
| try: | |
| # Linux Kernel Driver Detach | |
| try: | |
| device.set_nonblocking(1) | |
| device.detach_kernel_driver(0) | |
| except: | |
| pass | |
| # 4. Process Commands | |
| for ch in target_channels: | |
| key = str(ch) | |
| should_turn_on = False | |
| if action_arg == "on": | |
| should_turn_on = True | |
| elif action_arg == "off": | |
| should_turn_on = False | |
| elif action_arg == "toggle": | |
| should_turn_on = not state.get(key, False) | |
| # Send Command | |
| if send_lcus_command(device, ch, should_turn_on): | |
| state[key] = should_turn_on | |
| print(f"Relay {ch} -> {'ON' if should_turn_on else 'OFF'}") | |
| # Small delay to prevent flooding the slow MCU if doing 'all' | |
| if len(target_channels) > 1: | |
| time.sleep(0.05) | |
| else: | |
| print(f"Failed to control Relay {ch}") | |
| save_state(state) | |
| finally: | |
| device.close() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment