Created
October 12, 2025 13:53
-
-
Save ohadlevy/1cf4090786ca8b5e49669ccc8ff6effe to your computer and use it in GitHub Desktop.
c1000x capture
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Capture MQTT commands sent by mobile app to understand C1000X control protocol.""" | |
| import asyncio | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from aiohttp import ClientSession | |
| from api.api import AnkerSolixApi | |
| from api.mqtttypes import DeviceHexData | |
| import common | |
| _LOGGER = logging.getLogger(__name__) | |
| CONSOLE = common.CONSOLE | |
| class CommandCapture: | |
| """Capture and analyze MQTT commands.""" | |
| def __init__(self): | |
| self.captured_commands = [] | |
| self.captured_data = [] | |
| def capture_all_messages(self, session, topic, message, data, model, device_sn, timestamp): | |
| """Capture all MQTT messages (both commands and data).""" | |
| msg_entry = { | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "topic": topic, | |
| "device_sn": device_sn, | |
| "model": model, | |
| "message": message, | |
| "data_hex": data.hex(':') if isinstance(data, bytes) else None, | |
| "data_decoded": None | |
| } | |
| # Try to decode hex data | |
| if isinstance(data, bytes): | |
| try: | |
| hd = DeviceHexData(model=model or "", hexbytes=data) | |
| msg_entry["data_decoded"] = { | |
| "header": { | |
| "msg_type": hd.msg_header.msgtype.hex(), | |
| "length": hd.msg_header.msglength, | |
| "pattern": hd.msg_header.pattern.hex(), | |
| }, | |
| "fields": {name: field.hex() for name, field in hd.msg_fields.items()}, | |
| "values": hd.values() | |
| } | |
| except Exception as e: | |
| msg_entry["decode_error"] = str(e) | |
| # Classify message type | |
| if "cmd/" in topic: | |
| msg_entry["direction"] = "COMMAND (Sent to device)" | |
| self.captured_commands.append(msg_entry) | |
| CONSOLE.info(f"π΄ COMMAND CAPTURED: {topic}") | |
| elif "dt/" in topic: | |
| msg_entry["direction"] = "DATA (Received from device)" | |
| self.captured_data.append(msg_entry) | |
| CONSOLE.info(f"π΅ DATA CAPTURED: {topic}") | |
| else: | |
| msg_entry["direction"] = "UNKNOWN" | |
| # Show key state changes | |
| if isinstance(data, bytes): | |
| try: | |
| hd = DeviceHexData(model=model or "", hexbytes=data) | |
| values = hd.values() | |
| if "switch_ac_output_power" in values: | |
| state = values["switch_ac_output_power"] | |
| CONSOLE.info(f" π AC SWITCH STATE: {state} ({'ON' if state == 1 else 'OFF'})") | |
| except: | |
| pass | |
| async def main(): | |
| """Run the command capture.""" | |
| async with ClientSession() as websession: | |
| # Initialize API | |
| myapi = AnkerSolixApi( | |
| common.user(), common.password(), common.country(), websession, _LOGGER | |
| ) | |
| # Find C1000X device | |
| await myapi.update_sites() | |
| await myapi.update_device_details() | |
| device_sn = None | |
| for sn, device in myapi.devices.items(): | |
| if device.get("device_pn") == "A1761": | |
| device_sn = sn | |
| CONSOLE.info(f"Found C1000X device: {sn}") | |
| break | |
| if not device_sn: | |
| CONSOLE.info("No C1000X device found") | |
| return | |
| # Start MQTT session | |
| mqtt_session = await myapi.startMqttSession() | |
| if not mqtt_session: | |
| CONSOLE.info("Failed to start MQTT session") | |
| return | |
| # Set up capture | |
| capture = CommandCapture() | |
| device_dict = {"device_sn": device_sn, "device_pn": "A1761"} | |
| # Subscribe to BOTH directions | |
| topics = set() | |
| # Data messages (device to app) | |
| if prefix := mqtt_session.get_topic_prefix(deviceDict=device_dict, publish=False): | |
| topics.add(f"{prefix}#") | |
| # Command messages (app to device) | |
| if cmd_prefix := mqtt_session.get_topic_prefix(deviceDict=device_dict, publish=True): | |
| topics.add(f"{cmd_prefix}#") | |
| CONSOLE.info("\n" + "="*80) | |
| CONSOLE.info("π― COMMAND CAPTURE MODE") | |
| CONSOLE.info("="*80) | |
| CONSOLE.info("This will capture ALL MQTT traffic for your C1000X device.") | |
| CONSOLE.info("Topics monitored:") | |
| for topic in topics: | |
| CONSOLE.info(f" π‘ {topic}") | |
| CONSOLE.info("\nπ± NOW USE YOUR MOBILE APP TO:") | |
| CONSOLE.info(" 1. Turn AC output OFF") | |
| CONSOLE.info(" 2. Wait 5 seconds") | |
| CONSOLE.info(" 3. Turn AC output ON") | |
| CONSOLE.info(" 4. Wait 5 seconds") | |
| CONSOLE.info(" 5. Turn AC output OFF") | |
| CONSOLE.info("\nβ±οΈ I will capture for 60 seconds, then analyze the commands...") | |
| # Start message poller | |
| poller_task = asyncio.create_task( | |
| mqtt_session.message_poller( | |
| topics=topics, | |
| trigger_devices={device_sn}, | |
| msg_callback=capture.capture_all_messages, | |
| timeout=120, | |
| ) | |
| ) | |
| # Wait for captures | |
| await asyncio.sleep(5) | |
| CONSOLE.info("π’ Capture started - Use mobile app NOW!") | |
| # Monitor for 60 seconds | |
| for i in range(60): | |
| await asyncio.sleep(1) | |
| if i % 10 == 9: | |
| cmd_count = len(capture.captured_commands) | |
| data_count = len(capture.captured_data) | |
| CONSOLE.info(f" π Captured: {cmd_count} commands, {data_count} data messages ({i+1}/60s)") | |
| # Stop capture | |
| poller_task.cancel() | |
| try: | |
| await poller_task | |
| except asyncio.CancelledError: | |
| pass | |
| # Analysis | |
| CONSOLE.info("\n" + "="*80) | |
| CONSOLE.info("π CAPTURE ANALYSIS") | |
| CONSOLE.info("="*80) | |
| # Save captures to file | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| filename = f"command_capture_{timestamp}.json" | |
| capture_data = { | |
| "capture_timestamp": timestamp, | |
| "device_sn": device_sn, | |
| "commands": capture.captured_commands, | |
| "data_messages": capture.captured_data, | |
| "summary": { | |
| "total_commands": len(capture.captured_commands), | |
| "total_data": len(capture.captured_data), | |
| "total_messages": len(capture.captured_commands) + len(capture.captured_data) | |
| } | |
| } | |
| # Ensure output directory exists | |
| output_dir = Path("mqttdumps") | |
| output_dir.mkdir(exist_ok=True) | |
| with open(output_dir / filename, 'w') as f: | |
| json.dump(capture_data, f, indent=2) | |
| CONSOLE.info(f"πΎ Capture saved to: {output_dir / filename}") | |
| CONSOLE.info(f"π Summary:") | |
| CONSOLE.info(f" π΄ Commands captured: {len(capture.captured_commands)}") | |
| CONSOLE.info(f" π΅ Data messages captured: {len(capture.captured_data)}") | |
| if capture.captured_commands: | |
| CONSOLE.info("\nπ΄ COMMAND MESSAGES (These are what we need!):") | |
| for i, cmd in enumerate(capture.captured_commands, 1): | |
| CONSOLE.info(f" {i}. {cmd['timestamp']} - {cmd['topic']}") | |
| if cmd.get('data_decoded'): | |
| msg_type = cmd['data_decoded']['header']['msg_type'] | |
| fields = cmd['data_decoded']['fields'] | |
| CONSOLE.info(f" Message Type: {msg_type}") | |
| CONSOLE.info(f" Fields: {fields}") | |
| else: | |
| CONSOLE.info("\nβ οΈ NO COMMAND MESSAGES CAPTURED!") | |
| CONSOLE.info("This could mean:") | |
| CONSOLE.info(" - Mobile app uses different protocol") | |
| CONSOLE.info(" - Commands go through different MQTT topics") | |
| CONSOLE.info(" - Need to capture at different network level") | |
| if capture.captured_data: | |
| CONSOLE.info(f"\nπ΅ DATA MESSAGES (device responses):") | |
| ac_states = [] | |
| for data_msg in capture.captured_data: | |
| if data_msg.get('data_decoded', {}).get('values', {}).get('switch_ac_output_power') is not None: | |
| state = data_msg['data_decoded']['values']['switch_ac_output_power'] | |
| ac_states.append((data_msg['timestamp'], state)) | |
| if ac_states: | |
| CONSOLE.info(" AC Switch State Changes:") | |
| for timestamp, state in ac_states: | |
| CONSOLE.info(f" {timestamp}: {state} ({'ON' if state == 1 else 'OFF'})") | |
| else: | |
| CONSOLE.info(" No AC switch state changes detected") | |
| CONSOLE.info(f"\nπ Next Steps:") | |
| if capture.captured_commands: | |
| CONSOLE.info("β We captured commands! Analyze the message structure in the saved file.") | |
| CONSOLE.info("π§ Look for the exact message type and field structure used by mobile app.") | |
| else: | |
| CONSOLE.info("β No commands captured. Try these alternatives:") | |
| CONSOLE.info(" 1. Use network packet capture (Wireshark)") | |
| CONSOLE.info(" 2. Check if mobile app uses different MQTT topics") | |
| CONSOLE.info(" 3. Analyze mobile app binary for protocol details") | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except Exception as err: | |
| CONSOLE.info(f"{type(err)}: {err}")% |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment