Skip to content

Instantly share code, notes, and snippets.

@danielfaust
Last active June 8, 2026 20:17
Show Gist options
  • Select an option

  • Save danielfaust/3e93300b858022acb364f21b71496ec8 to your computer and use it in GitHub Desktop.

Select an option

Save danielfaust/3e93300b858022acb364f21b71496ec8 to your computer and use it in GitHub Desktop.
Read out Engelmann FAW water meters via optical M-Bus in Python.
'''
Engelmann Funkaufsatzmodul FAW
https://www.engelmann.de/product/funkaufsatzmodul-faw/
Platine ttl ir lesekopf lese-schreib-Kopf EHZ Volkszähler Hichi Smartmeter
https://www.ebay.de/itm/313525835802
Wärmezähler über optische M-Bus-Schnittstelle auslesen
https://www.mikrocontroller.net/topic/438972?page=single
'''
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
import io
import os
import time
import json
import serial
import logging
import datetime
import binascii
import threading
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LOG_TO_FILE = True
READ_SENSORS = True
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
READ_RETRIES = 10
WRITE_55_COUNT = 528 # 2400 (bits/second) * 2.2 (seconds) = 5280 (bits) | 5280 (bits) / 10 (bits/byte) = 528 (bytes) | 10 (bits/byte) = 1 (start bit) + 8 (data bits) + 1 (stop bit)
SLEEP_55_SECONDS = 0.350
SLEEP_55_SECONDS = 0.1875 # 330 / 2400 + 0.05 | EN1434-3: 330 bitperioden / 2400 baud + 50 ms "Wartezeit von 234ms (10 Zeichen Wartezeit? - Eigentlich schreibt EN1434-3 330 Bitperioden + 50ms vor)"
SLEEP_55_SECONDS = 0.100
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
devices = [
# fill the mock data with the data returned from the first reading of the sensor (75 bytes)
{'tty': '/dev/ttyUSB0', 'mock-data': '68~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~16'},
]
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
meters = {
# serial meter mbus
'12345678': {'name':'badezimmer warmwasser', 'offset': 12345 - 1234},
'23456789': {'name':'badezimmer kaltwasser', 'offset': 23456 - 2345},
}
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def get_data(ser=None):
try:
result = None
attempt = 0
while attempt < READ_RETRIES:
attempt += 1
NOW = datetime.datetime.now()
file = io.StringIO()
if LOG_TO_FILE:
if not os.path.exists('log-files'):
os.makedirs('log-files')
file = open(f"log-files/log--{NOW.strftime('%Y-%m-%d--%H%M%S')}--{attempt}.txt", 'w')
with file:
logger.info(f"waking up sensor, attempt {attempt}")
start_wakeup = time.perf_counter()
ser.parity = serial.PARITY_NONE
ser.write(b'\x55' * WRITE_55_COUNT)
ser.flush()
stop_wakeup = time.perf_counter()
logger.info(f"sent wakeup sequence in {stop_wakeup - start_wakeup:.3f} seconds (probably buffered)")
if SLEEP_55_SECONDS > 0:
time.sleep(SLEEP_55_SECONDS)
logger.info("sending data request sequence 105bfe5916")
read_data = b'\x10\x5B\xFE\x59\x16'
ser.parity = serial.PARITY_EVEN
ser.write(read_data)
ser.flush()
result = b""
datagram = b""
is_echo = True
logger.info("reading received data")
while True:
try:
if len(datagram) == 75:
print()
if LOG_TO_FILE:
file.write(f"\n{binascii.hexlify(datagram).decode()}\n")
file.flush()
logger.info("finished reading response")
return result
byte = ser.read(1)
if not byte:
logger.warning("no bytes received from sensor")
print(len(datagram))
result = None
break
result += byte
byte_array_hex = binascii.hexlify(byte)
byte_array_hex_str = byte_array_hex.decode()
if not is_echo:
print(byte_array_hex_str, flush=True, end='')
datagram += byte
if LOG_TO_FILE:
file.write(byte_array_hex_str + ' ')
file.flush()
if binascii.hexlify(result[-5:]).decode() == '105bfe5916':
logger.info("found echoed 105bfe5916, reading response")
result = b""
is_echo = False
except serial.SerialTimeoutException as e:
logger.warning("timeout occurred", exc_info=e)
print(len(datagram))
result = None
break
except serial.SerialException as e:
logger.warning("serial communication error while reading response", exc_info=e)
print(len(datagram))
result = None
break
except Exception as e:
logger.error("An unexpected error occurred while reading response", exc_info=e)
print(len(datagram))
result = None
break
return result
except serial.SerialException as e:
logger.warning("serial communication error", exc_info=e)
return None
except Exception as e:
logger.error("An unexpected error occurred while reading response", exc_info=e)
return None
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
start_sequence = '55105bfe5916'
intro_sequence = '68454568080072'
time_sequence = '046d'
total_sequence = '0413'
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def little_to_big_endian(hex_string):
bytes_list = [hex_string[i:i+2] for i in range(0, len(hex_string), 2)]
bytes_list.reverse()
return ''.join(bytes_list)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def hex_to_datetime(hex_value):
hex_bytes = bytes.fromhex(hex_value)
minute = hex_bytes[0] & 0x3F # n5..n0 (6 bits)
hour = hex_bytes[1] & 0x1F # h4..h0 (5 bits)
day = hex_bytes[2] & 0x1F # j4..j0 (5 bits)
month = hex_bytes[3] & 0x0F # M3..M0 (4 bits)
year = ((hex_bytes[2] >> 5) | ((hex_bytes[3] & 0x70) >> 1))
return datetime.datetime(2000 + year, month, day, hour, minute)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def hex_to_decimal(msb_hex):
return int(msb_hex, 16)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def process_data(data):
_serial_index = data.find(intro_sequence) + len(intro_sequence)
_serial = data[_serial_index:_serial_index+8]
_serial = little_to_big_endian(_serial)
_time_index = data.find(time_sequence) + len(time_sequence)
_time = data[_time_index:_time_index+8]
_time = hex_to_datetime(_time)
_total_index = data.find(total_sequence) + len(total_sequence)
_total = data[_total_index:_total_index+8]
_total = hex_to_decimal(little_to_big_endian(_total))
if _serial not in meters:
print(f"Unknown serial number: {_serial}. Add it to the \"meters\" list as follows:\n'{_serial}': {{'name': '<meter name>', 'offset': <analog value as displayed on meter> - {_total}}},\n")
else:
_meter = meters[_serial]
_meter['now'] = datetime.datetime.now()
_meter['time'] = _time
_meter['total'] = _total + _meter['offset']
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for device in devices:
if True:
raw_data_hex_str = None
if READ_SENSORS:
ser = serial.Serial(device['tty'], baudrate=2400, bytesize=8, parity=serial.PARITY_NONE, stopbits=1, timeout=2)
raw_data = get_data(ser)
ser.close()
if raw_data is not None:
raw_data_hex_str = binascii.hexlify(raw_data).decode()
else:
raw_data_hex_str = device['mock-data']
print(device['tty'], raw_data_hex_str)
print()
if raw_data_hex_str is not None:
process_data(raw_data_hex_str)
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
print(f"\n--------------------------\n")
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for _serial, data in meters.items():
if 'time' in data:
_name = data['name']
_time = data['time']
_total = data['total']
print(f"{_name} | {_serial} | {_time.strftime('%Y-%m-%d %H:%M')} | {_total:>5} Liter")
#:: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@Rapptor33

Copy link
Copy Markdown

Hallo habe genau das selbe Funkzusatzmodul bei mir zu Hause Engelmann FAW.
Funktioniert der Hichi Wifi V2 --> https://sites.google.com/view/hichi-lesekopf/wifi-v2 für das Modul?

@danielfaust

Copy link
Copy Markdown
Author

Hallo, das weiß ich nicht. Ich benutze diesen hier https://www.ebay.de/itm/313525835802 der auch ein Hichi ist. Ich habe mehrere davon im Einsatz, auch an Stromzählern. Wenn der Hichi Wifi V2 die Rohdaten per WLAN einfach nur durchreichen kann, dann wäre das auch für mich interessant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment