Skip to content

Instantly share code, notes, and snippets.

@kumy
Created September 22, 2025 18:36
Show Gist options
  • Save kumy/ba9801bb4aa2e72dfa0644c882fa690a to your computer and use it in GitHub Desktop.
Save kumy/ba9801bb4aa2e72dfa0644c882fa690a to your computer and use it in GitHub Desktop.
#!/bin/env python3
import asyncio
from datetime import datetime
from hoymiles_wifi.protobuf import APPHeartbeatPB_pb2, APPInfomationData_pb2, GetConfig_pb2, RealDataNew_pb2, \
WarnData_pb2
import crcmod.predefined
OFFSET = 0
crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
crc16_modbus = crcmod.predefined.Crc('modbus')
def printfields(message):
print(f"Sending: ", end ='')
for field_descriptor, value in message.ListFields():
field_name = field_descriptor.name
field_value = value
print(f"{field_name}: {field_value} ", end='')
print('')
def async_WarnRes(payload, transport) -> WarnData_pb2.WarnResDTO | None:
WarnRes = WarnData_pb2.WarnReqDTO()
parsed = WarnRes.FromString(payload[10:])
sequence = int(payload[4:6].hex(), 16)
print("parsed:")
print(parsed)
request = WarnData_pb2.WarnResDTO()
request.ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = OFFSET
header = b'\x48\x4d\x23\x0B'
send(header, request, sequence, transport)
def async_RealDataNewReq(payload, transport, key) -> RealDataNew_pb2.RealDataNewResDTO | None:
RealDataNewReq = RealDataNew_pb2.RealDataNewReqDTO()
parsed = RealDataNewReq.FromString(payload[10:])
sequence = int(payload[4:6].hex(), 16)
print("parsed:")
print(parsed)
request = RealDataNew_pb2.RealDataNewResDTO()
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = OFFSET
if key == 'C':
header = b'\x48\x4d\x23\x0C'
else:
header = b'\x48\x4d\x23\x0D'
send(header, request, sequence, transport)
def async_GetConfigRes(payload, transport) -> GetConfig_pb2.GetConfigResDTO | None:
GetConfigRes = GetConfig_pb2.GetConfigResDTO()
parsed = GetConfigRes.FromString(payload[10:])
sequence = int(payload[4:6].hex(), 16)
print("parsed:")
print(parsed)
request = APPInfomationData_pb2.APPInfoDataResDTO()
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = OFFSET
header = b'\x48\x4d\x23\x09'
send(header, request, sequence, transport)
def async_app_information_data(payload, transport) -> APPInfomationData_pb2.APPInfoDataReqDTO | None:
APPInfoDataReq = APPInfomationData_pb2.APPInfoDataReqDTO()
parsed = APPInfoDataReq.FromString(payload[10:])
sequence = int(payload[4:6].hex(), 16)
print("parsed:")
print(parsed)
request = APPInfomationData_pb2.APPInfoDataResDTO()
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = OFFSET
header = b'\x48\x4d\x23\x01'
send(header, request, sequence, transport)
def async_heartbeat(payload, transport) -> APPHeartbeatPB_pb2.HBReqDTO | None:
"""Request heartbeat."""
# global sequence
HBReq = APPHeartbeatPB_pb2.HBReqDTO()
parsed = HBReq.FromString(payload[10:])
sequence = int(payload[4:6].hex(), 16)
print(parsed)
print(sequence)
request = APPHeartbeatPB_pb2.HBResDTO()
request.time_ymd_hms = (
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8")
)
request.offset = OFFSET
header = b'\x48\x4d\x23\x02'
send(header, request, sequence, transport)
def send(header, request, sequence, transport):
crc = crc16(request.SerializeToString())
length = len(request.SerializeToString())+10
# Send the data
print("Response:")
printfields(request)
# print(f"Sending: ", end ='')
# for field_descriptor, value in request.ListFields():
# field_name = field_descriptor.name
# field_value = value
# print(f"{field_name}: {field_value} ", end='')
# print('')
message = header+sequence.to_bytes(2,byteorder='big')+crc.to_bytes(2, byteorder='big')+length.to_bytes(2, byteorder='big')+request.SerializeToString()
print('Send: {}'.format(message.hex()))
transport.write(message)
# sequence=sequence+1
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.peername = transport.get_extra_info('peername')
print('\n************************************')
print('Connection from {}'.format(self.peername))
self.transport = transport
def eof_received(self):
print('\n************************************')
print('Connection closed from {}'.format(self.peername))
print('************************************')
def data_received(self, data):
print('\n====================================')
print('Connection from {}'.format(self.peername))
print('Data received: {!r}\n'.format(data.hex()))
# if self.transport.get_extra_info('peername')[0] != '127.0.0.1' and self.transport.get_extra_info('peername')[0] != '192.168.130.209':
# print("skip real devices\n")
# self.transport.close()
# return
# print('Send: {!r}'.format(message))
# self.transport.write(data)
try:
command_id = data[2:4].hex().upper()
print(f"COMMAND: {command_id}")
if command_id == "2201":
async_app_information_data(data, self.transport)
elif command_id == "2202":
async_heartbeat(data, self.transport)
elif command_id == "2209":
async_GetConfigRes(data, self.transport)
elif command_id == "220C":
async_RealDataNewReq(data, self.transport, 'C')
elif command_id == "220D":
async_RealDataNewReq(data, self.transport, 'D')
elif command_id == "220B":
# if self.transport.get_extra_info('peername')[0] != '127.0.0.1':
# print("skip real devices\n")
# self.transport.close()
# return
async_WarnRes(data, self.transport)
else:
print(f"Skip command {command_id}")
self.transport.close()
return
except Exception as e:
print(f"e: {e}")
print('--- End of query\n')
# print('Close the client socket\n')
# self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
print("Starting service")
server = await loop.create_server(
lambda: EchoServerProtocol(),
'0.0.0.0', 10081)
async with server:
await server.serve_forever()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment