Created
September 22, 2025 18:36
-
-
Save kumy/ba9801bb4aa2e72dfa0644c882fa690a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
import asyncio | |
from datetime import datetime | |
from hoymiles_wifi.protobuf import APPHeartbeatPB_pb2, APPInfomationData_pb2, GetConfig_pb2, RealDataNew_pb2, \ | |
WarnData_pb2 | |
import crcmod.predefined | |
OFFSET = 0 | |
crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000) | |
crc16_modbus = crcmod.predefined.Crc('modbus') | |
def printfields(message): | |
print(f"Sending: ", end ='') | |
for field_descriptor, value in message.ListFields(): | |
field_name = field_descriptor.name | |
field_value = value | |
print(f"{field_name}: {field_value} ", end='') | |
print('') | |
def async_WarnRes(payload, transport) -> WarnData_pb2.WarnResDTO | None: | |
WarnRes = WarnData_pb2.WarnReqDTO() | |
parsed = WarnRes.FromString(payload[10:]) | |
sequence = int(payload[4:6].hex(), 16) | |
print("parsed:") | |
print(parsed) | |
request = WarnData_pb2.WarnResDTO() | |
request.ymd_hms = ( | |
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") | |
) | |
request.offset = OFFSET | |
header = b'\x48\x4d\x23\x0B' | |
send(header, request, sequence, transport) | |
def async_RealDataNewReq(payload, transport, key) -> RealDataNew_pb2.RealDataNewResDTO | None: | |
RealDataNewReq = RealDataNew_pb2.RealDataNewReqDTO() | |
parsed = RealDataNewReq.FromString(payload[10:]) | |
sequence = int(payload[4:6].hex(), 16) | |
print("parsed:") | |
print(parsed) | |
request = RealDataNew_pb2.RealDataNewResDTO() | |
request.time_ymd_hms = ( | |
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") | |
) | |
request.offset = OFFSET | |
if key == 'C': | |
header = b'\x48\x4d\x23\x0C' | |
else: | |
header = b'\x48\x4d\x23\x0D' | |
send(header, request, sequence, transport) | |
def async_GetConfigRes(payload, transport) -> GetConfig_pb2.GetConfigResDTO | None: | |
GetConfigRes = GetConfig_pb2.GetConfigResDTO() | |
parsed = GetConfigRes.FromString(payload[10:]) | |
sequence = int(payload[4:6].hex(), 16) | |
print("parsed:") | |
print(parsed) | |
request = APPInfomationData_pb2.APPInfoDataResDTO() | |
request.time_ymd_hms = ( | |
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") | |
) | |
request.offset = OFFSET | |
header = b'\x48\x4d\x23\x09' | |
send(header, request, sequence, transport) | |
def async_app_information_data(payload, transport) -> APPInfomationData_pb2.APPInfoDataReqDTO | None: | |
APPInfoDataReq = APPInfomationData_pb2.APPInfoDataReqDTO() | |
parsed = APPInfoDataReq.FromString(payload[10:]) | |
sequence = int(payload[4:6].hex(), 16) | |
print("parsed:") | |
print(parsed) | |
request = APPInfomationData_pb2.APPInfoDataResDTO() | |
request.time_ymd_hms = ( | |
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") | |
) | |
request.offset = OFFSET | |
header = b'\x48\x4d\x23\x01' | |
send(header, request, sequence, transport) | |
def async_heartbeat(payload, transport) -> APPHeartbeatPB_pb2.HBReqDTO | None: | |
"""Request heartbeat.""" | |
# global sequence | |
HBReq = APPHeartbeatPB_pb2.HBReqDTO() | |
parsed = HBReq.FromString(payload[10:]) | |
sequence = int(payload[4:6].hex(), 16) | |
print(parsed) | |
print(sequence) | |
request = APPHeartbeatPB_pb2.HBResDTO() | |
request.time_ymd_hms = ( | |
datetime.now().strftime("%Y-%m-%d %H:%M:%S").encode("utf-8") | |
) | |
request.offset = OFFSET | |
header = b'\x48\x4d\x23\x02' | |
send(header, request, sequence, transport) | |
def send(header, request, sequence, transport): | |
crc = crc16(request.SerializeToString()) | |
length = len(request.SerializeToString())+10 | |
# Send the data | |
print("Response:") | |
printfields(request) | |
# print(f"Sending: ", end ='') | |
# for field_descriptor, value in request.ListFields(): | |
# field_name = field_descriptor.name | |
# field_value = value | |
# print(f"{field_name}: {field_value} ", end='') | |
# print('') | |
message = header+sequence.to_bytes(2,byteorder='big')+crc.to_bytes(2, byteorder='big')+length.to_bytes(2, byteorder='big')+request.SerializeToString() | |
print('Send: {}'.format(message.hex())) | |
transport.write(message) | |
# sequence=sequence+1 | |
class EchoServerProtocol(asyncio.Protocol): | |
def connection_made(self, transport): | |
self.peername = transport.get_extra_info('peername') | |
print('\n************************************') | |
print('Connection from {}'.format(self.peername)) | |
self.transport = transport | |
def eof_received(self): | |
print('\n************************************') | |
print('Connection closed from {}'.format(self.peername)) | |
print('************************************') | |
def data_received(self, data): | |
print('\n====================================') | |
print('Connection from {}'.format(self.peername)) | |
print('Data received: {!r}\n'.format(data.hex())) | |
# if self.transport.get_extra_info('peername')[0] != '127.0.0.1' and self.transport.get_extra_info('peername')[0] != '192.168.130.209': | |
# print("skip real devices\n") | |
# self.transport.close() | |
# return | |
# print('Send: {!r}'.format(message)) | |
# self.transport.write(data) | |
try: | |
command_id = data[2:4].hex().upper() | |
print(f"COMMAND: {command_id}") | |
if command_id == "2201": | |
async_app_information_data(data, self.transport) | |
elif command_id == "2202": | |
async_heartbeat(data, self.transport) | |
elif command_id == "2209": | |
async_GetConfigRes(data, self.transport) | |
elif command_id == "220C": | |
async_RealDataNewReq(data, self.transport, 'C') | |
elif command_id == "220D": | |
async_RealDataNewReq(data, self.transport, 'D') | |
elif command_id == "220B": | |
# if self.transport.get_extra_info('peername')[0] != '127.0.0.1': | |
# print("skip real devices\n") | |
# self.transport.close() | |
# return | |
async_WarnRes(data, self.transport) | |
else: | |
print(f"Skip command {command_id}") | |
self.transport.close() | |
return | |
except Exception as e: | |
print(f"e: {e}") | |
print('--- End of query\n') | |
# print('Close the client socket\n') | |
# self.transport.close() | |
async def main(): | |
# Get a reference to the event loop as we plan to use | |
# low-level APIs. | |
loop = asyncio.get_running_loop() | |
print("Starting service") | |
server = await loop.create_server( | |
lambda: EchoServerProtocol(), | |
'0.0.0.0', 10081) | |
async with server: | |
await server.serve_forever() | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment