Skip to content

Instantly share code, notes, and snippets.

@assimilat
Created December 8, 2025 20:40
Show Gist options
  • Select an option

  • Save assimilat/62c5bd8dbc257844c2bfeb068398ad28 to your computer and use it in GitHub Desktop.

Select an option

Save assimilat/62c5bd8dbc257844c2bfeb068398ad28 to your computer and use it in GitHub Desktop.
5131:2007 8 channel relay board control
#!/usr/bin/env python3
import sys
import os
import json
import time
# Try to import hidapi
try:
import hid
except ImportError:
print("Error: 'hidapi' library is missing.")
print("Please run: pip3 install hidapi")
sys.exit(1)
# ==============================================================================
# USB HID Relay Control Script (CH554T / LCUS-over-HID Version)
# Device: 5131:2007 (CH554T Chipset)
# Protocol: LCUS Packets [0xA0, Channel, State, Checksum] wrapped in HID
# ==============================================================================
VID = 0x5131
PID = 0x2007
STATE_FILE = "/var/tmp/usb_relay_state.json"
def load_state():
"""Load the current state of all 8 relays from JSON."""
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f:
return json.load(f)
except:
pass
# Default: All OFF
return {str(i): False for i in range(1, 9)}
def save_state(state):
"""Save the state to JSON."""
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
def get_device():
"""Find and open the relay device."""
try:
# Search specifically for our VID/PID
interfaces = hid.enumerate(VID, PID)
if not interfaces:
print(f"Error: Device {hex(VID)}:{hex(PID)} not found.")
return None
# Sort to ensure stability
interfaces.sort(key=lambda x: x['path'])
path = interfaces[0]['path']
# print(f"Connecting to: {path.decode('utf-8', 'ignore')}")
h = hid.device()
h.open_path(path)
return h
except Exception as e:
print(f"Connection Error: {e}")
return None
def send_lcus_command(device, channel, turn_on):
"""
Sends command using LCUS protocol (Hex codes usually used over Serial).
Structure: [A0, Channel, State, Checksum]
"""
cmd_byte = 0xA0
state_byte = 0x01 if turn_on else 0x00
# Checksum = Sum of first 3 bytes (masked to 8 bits)
checksum = (cmd_byte + channel + state_byte) & 0xFF
# Packet Construction
# Byte 0: Report ID (0x00) - Required by hidapi for standard write
# Byte 1: Start Byte (0xA0)
# Byte 2: Channel
# Byte 3: State
# Byte 4: Checksum
# Padding: Zeros to fill packet (usually 8 or 64 bytes, sending 8 is safe)
packet = [0x00, cmd_byte, channel, state_byte, checksum, 0x00, 0x00, 0x00, 0x00]
try:
# Method 1: Standard Output Report (Write)
# This is the most likely method for CH554T chips
device.write(packet)
return True
except Exception as e:
print(f"Write failed: {e}")
# Method 2: Feature Report (Fallback)
try:
print("Attempting Feature Report fallback...")
device.send_feature_report(packet)
return True
except Exception as e2:
print(f"Feature Report failed: {e2}")
return False
def main():
if len(sys.argv) < 3:
print("Usage: sudo ./relay.py <channel> <on|off|toggle>")
print(" sudo ./relay.py all <on|off>")
sys.exit(1)
channel_arg = sys.argv[1].lower()
action_arg = sys.argv[2].lower()
# 1. Load current state
state = load_state()
# 2. Update State in Memory
target_channels = []
if channel_arg == "all":
target_channels = range(1, 9)
else:
try:
ch = int(channel_arg)
if not (1 <= ch <= 8):
raise ValueError
target_channels = [ch]
except:
print("Error: Channel must be 1-8")
sys.exit(1)
# 3. Connect to Device
device = get_device()
if not device:
sys.exit(1)
try:
# Linux Kernel Driver Detach
try:
device.set_nonblocking(1)
device.detach_kernel_driver(0)
except:
pass
# 4. Process Commands
for ch in target_channels:
key = str(ch)
should_turn_on = False
if action_arg == "on":
should_turn_on = True
elif action_arg == "off":
should_turn_on = False
elif action_arg == "toggle":
should_turn_on = not state.get(key, False)
# Send Command
if send_lcus_command(device, ch, should_turn_on):
state[key] = should_turn_on
print(f"Relay {ch} -> {'ON' if should_turn_on else 'OFF'}")
# Small delay to prevent flooding the slow MCU if doing 'all'
if len(target_channels) > 1:
time.sleep(0.05)
else:
print(f"Failed to control Relay {ch}")
save_state(state)
finally:
device.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment