Skip to content

Instantly share code, notes, and snippets.

@ohadlevy
Created October 12, 2025 13:53
Show Gist options
  • Select an option

  • Save ohadlevy/1cf4090786ca8b5e49669ccc8ff6effe to your computer and use it in GitHub Desktop.

Select an option

Save ohadlevy/1cf4090786ca8b5e49669ccc8ff6effe to your computer and use it in GitHub Desktop.
c1000x capture
#!/usr/bin/env python3
"""Capture MQTT commands sent by mobile app to understand C1000X control protocol."""
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from aiohttp import ClientSession
from api.api import AnkerSolixApi
from api.mqtttypes import DeviceHexData
import common
_LOGGER = logging.getLogger(__name__)
CONSOLE = common.CONSOLE
class CommandCapture:
"""Capture and analyze MQTT commands."""
def __init__(self):
self.captured_commands = []
self.captured_data = []
def capture_all_messages(self, session, topic, message, data, model, device_sn, timestamp):
"""Capture all MQTT messages (both commands and data)."""
msg_entry = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"topic": topic,
"device_sn": device_sn,
"model": model,
"message": message,
"data_hex": data.hex(':') if isinstance(data, bytes) else None,
"data_decoded": None
}
# Try to decode hex data
if isinstance(data, bytes):
try:
hd = DeviceHexData(model=model or "", hexbytes=data)
msg_entry["data_decoded"] = {
"header": {
"msg_type": hd.msg_header.msgtype.hex(),
"length": hd.msg_header.msglength,
"pattern": hd.msg_header.pattern.hex(),
},
"fields": {name: field.hex() for name, field in hd.msg_fields.items()},
"values": hd.values()
}
except Exception as e:
msg_entry["decode_error"] = str(e)
# Classify message type
if "cmd/" in topic:
msg_entry["direction"] = "COMMAND (Sent to device)"
self.captured_commands.append(msg_entry)
CONSOLE.info(f"πŸ”΄ COMMAND CAPTURED: {topic}")
elif "dt/" in topic:
msg_entry["direction"] = "DATA (Received from device)"
self.captured_data.append(msg_entry)
CONSOLE.info(f"πŸ”΅ DATA CAPTURED: {topic}")
else:
msg_entry["direction"] = "UNKNOWN"
# Show key state changes
if isinstance(data, bytes):
try:
hd = DeviceHexData(model=model or "", hexbytes=data)
values = hd.values()
if "switch_ac_output_power" in values:
state = values["switch_ac_output_power"]
CONSOLE.info(f" πŸ”Œ AC SWITCH STATE: {state} ({'ON' if state == 1 else 'OFF'})")
except:
pass
async def main():
"""Run the command capture."""
async with ClientSession() as websession:
# Initialize API
myapi = AnkerSolixApi(
common.user(), common.password(), common.country(), websession, _LOGGER
)
# Find C1000X device
await myapi.update_sites()
await myapi.update_device_details()
device_sn = None
for sn, device in myapi.devices.items():
if device.get("device_pn") == "A1761":
device_sn = sn
CONSOLE.info(f"Found C1000X device: {sn}")
break
if not device_sn:
CONSOLE.info("No C1000X device found")
return
# Start MQTT session
mqtt_session = await myapi.startMqttSession()
if not mqtt_session:
CONSOLE.info("Failed to start MQTT session")
return
# Set up capture
capture = CommandCapture()
device_dict = {"device_sn": device_sn, "device_pn": "A1761"}
# Subscribe to BOTH directions
topics = set()
# Data messages (device to app)
if prefix := mqtt_session.get_topic_prefix(deviceDict=device_dict, publish=False):
topics.add(f"{prefix}#")
# Command messages (app to device)
if cmd_prefix := mqtt_session.get_topic_prefix(deviceDict=device_dict, publish=True):
topics.add(f"{cmd_prefix}#")
CONSOLE.info("\n" + "="*80)
CONSOLE.info("🎯 COMMAND CAPTURE MODE")
CONSOLE.info("="*80)
CONSOLE.info("This will capture ALL MQTT traffic for your C1000X device.")
CONSOLE.info("Topics monitored:")
for topic in topics:
CONSOLE.info(f" πŸ“‘ {topic}")
CONSOLE.info("\nπŸ“± NOW USE YOUR MOBILE APP TO:")
CONSOLE.info(" 1. Turn AC output OFF")
CONSOLE.info(" 2. Wait 5 seconds")
CONSOLE.info(" 3. Turn AC output ON")
CONSOLE.info(" 4. Wait 5 seconds")
CONSOLE.info(" 5. Turn AC output OFF")
CONSOLE.info("\n⏱️ I will capture for 60 seconds, then analyze the commands...")
# Start message poller
poller_task = asyncio.create_task(
mqtt_session.message_poller(
topics=topics,
trigger_devices={device_sn},
msg_callback=capture.capture_all_messages,
timeout=120,
)
)
# Wait for captures
await asyncio.sleep(5)
CONSOLE.info("🟒 Capture started - Use mobile app NOW!")
# Monitor for 60 seconds
for i in range(60):
await asyncio.sleep(1)
if i % 10 == 9:
cmd_count = len(capture.captured_commands)
data_count = len(capture.captured_data)
CONSOLE.info(f" πŸ“Š Captured: {cmd_count} commands, {data_count} data messages ({i+1}/60s)")
# Stop capture
poller_task.cancel()
try:
await poller_task
except asyncio.CancelledError:
pass
# Analysis
CONSOLE.info("\n" + "="*80)
CONSOLE.info("πŸ“Š CAPTURE ANALYSIS")
CONSOLE.info("="*80)
# Save captures to file
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = f"command_capture_{timestamp}.json"
capture_data = {
"capture_timestamp": timestamp,
"device_sn": device_sn,
"commands": capture.captured_commands,
"data_messages": capture.captured_data,
"summary": {
"total_commands": len(capture.captured_commands),
"total_data": len(capture.captured_data),
"total_messages": len(capture.captured_commands) + len(capture.captured_data)
}
}
# Ensure output directory exists
output_dir = Path("mqttdumps")
output_dir.mkdir(exist_ok=True)
with open(output_dir / filename, 'w') as f:
json.dump(capture_data, f, indent=2)
CONSOLE.info(f"πŸ’Ύ Capture saved to: {output_dir / filename}")
CONSOLE.info(f"πŸ“ˆ Summary:")
CONSOLE.info(f" πŸ”΄ Commands captured: {len(capture.captured_commands)}")
CONSOLE.info(f" πŸ”΅ Data messages captured: {len(capture.captured_data)}")
if capture.captured_commands:
CONSOLE.info("\nπŸ”΄ COMMAND MESSAGES (These are what we need!):")
for i, cmd in enumerate(capture.captured_commands, 1):
CONSOLE.info(f" {i}. {cmd['timestamp']} - {cmd['topic']}")
if cmd.get('data_decoded'):
msg_type = cmd['data_decoded']['header']['msg_type']
fields = cmd['data_decoded']['fields']
CONSOLE.info(f" Message Type: {msg_type}")
CONSOLE.info(f" Fields: {fields}")
else:
CONSOLE.info("\n⚠️ NO COMMAND MESSAGES CAPTURED!")
CONSOLE.info("This could mean:")
CONSOLE.info(" - Mobile app uses different protocol")
CONSOLE.info(" - Commands go through different MQTT topics")
CONSOLE.info(" - Need to capture at different network level")
if capture.captured_data:
CONSOLE.info(f"\nπŸ”΅ DATA MESSAGES (device responses):")
ac_states = []
for data_msg in capture.captured_data:
if data_msg.get('data_decoded', {}).get('values', {}).get('switch_ac_output_power') is not None:
state = data_msg['data_decoded']['values']['switch_ac_output_power']
ac_states.append((data_msg['timestamp'], state))
if ac_states:
CONSOLE.info(" AC Switch State Changes:")
for timestamp, state in ac_states:
CONSOLE.info(f" {timestamp}: {state} ({'ON' if state == 1 else 'OFF'})")
else:
CONSOLE.info(" No AC switch state changes detected")
CONSOLE.info(f"\nπŸ” Next Steps:")
if capture.captured_commands:
CONSOLE.info("βœ… We captured commands! Analyze the message structure in the saved file.")
CONSOLE.info("πŸ”§ Look for the exact message type and field structure used by mobile app.")
else:
CONSOLE.info("❌ No commands captured. Try these alternatives:")
CONSOLE.info(" 1. Use network packet capture (Wireshark)")
CONSOLE.info(" 2. Check if mobile app uses different MQTT topics")
CONSOLE.info(" 3. Analyze mobile app binary for protocol details")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as err:
CONSOLE.info(f"{type(err)}: {err}")%
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment