Last active
December 20, 2021 15:02
-
-
Save bmxp/7da6e85e766f886b55fbda6118aeb491 to your computer and use it in GitHub Desktop.
DLMS Improvement
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab | |
######################################################################### | |
# Copyright 2013 - 2015 KNX-User-Forum e.V. http://knx-user-forum.de/ | |
# Copyright 2016 - 2021 Bernd Meiners [email protected] | |
######################################################################### | |
# | |
# DLMS plugin for SmartHomeNG | |
# | |
# This file is part of SmartHomeNG.py. | |
# Visit: https://github.com/smarthomeNG/ | |
# https://knx-user-forum.de/forum/supportforen/smarthome-py | |
# https://smarthomeng.de | |
# | |
# SmartHomeNG.py is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# SmartHomeNG.py is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with SmartHomeNG.py. If not, see <http://www.gnu.org/licenses/>. | |
######################################################################### | |
__license__ = "GPL" | |
__version__ = "2.0" | |
__revision__ = "0.1" | |
__docformat__ = 'reStructuredText' | |
import logging | |
import datetime | |
from ruamel.yaml import YAML | |
if __name__ == '__main__': | |
logger = logging.getLogger(__name__) | |
logger.debug(f"init standalone {__name__}") | |
else: | |
logger = logging.getLogger() | |
logger.debug("init plugin component {__name__}") | |
import time | |
import serial | |
import re | |
from threading import Semaphore | |
manufacturer_ids = {} | |
exportfile = 'manufacturer.yaml' | |
try: | |
with open(exportfile, 'r') as infile: | |
y = YAML(typ='safe') | |
manufacturer_ids = y.load(infile) | |
except: | |
pass | |
""" | |
This module implements the query of a smartmeter using the DLMS protocol. | |
The smartmeter needs to have an infrared interface and an IR-Adapter is needed for USB. | |
The Character Format for protocol mode A - D is defined as 1 start bit, 7 data bits, 1 parity bit, 1 stop bit and even parity | |
In protocol mode E it is defined as 1 start bit, 8 data bits, 1 stop bit is allowed, see Annex E of IEC62056-21 | |
For this plugin the protocol mode E is neither implemented nor supported. | |
Abbreviations | |
------------- | |
COSEM | |
COmpanion Specification for Energy Metering | |
OBIS | |
OBject Identification System (see iec62056-61{ed1.0}en_obis_protocol.pdf) | |
""" | |
SOH = 0x01 # start of header | |
STX = 0x02 # start of text | |
ETX = 0x03 # end of text | |
ACK = 0x06 # acknowledge | |
CR = 0x0D # carriage return | |
LF = 0x0A # linefeed | |
BCC = 0x00 # Block check Character will contain the checksum immediately following the data packet | |
def format_time( timedelta ): | |
""" | |
returns a pretty formatted string according to the size of the timedelta | |
:param timediff: time delta given in seconds | |
:return: returns a string | |
""" | |
if timedelta > 1000.0: | |
return f"{timedelta:.2f} s" | |
elif timedelta > 1.0: | |
return f"{timedelta:.2f} s" | |
elif timedelta > 0.001: | |
return f"{timedelta*1000.0:.2f} ms" | |
elif timedelta > 0.000001: | |
return f"{timedelta*1000000.0:.2f} µs" | |
elif timedelta > 0.000000001: | |
return f"{timedelta * 1000000000.0:.2f} ns" | |
def read_data_block_from_serial(the_serial, end_byte=0x0a, start_byte=None, max_read_time=None): | |
""" | |
This function reads some bytes from serial interface | |
it returns an array of bytes if a timeout occurs or a given end byte is encountered | |
and otherwise None if an error occurred | |
:param the_serial: interface to read from | |
:param end_byte: the indicator for end of data, this will be included in response | |
:param start_byte: the indicator for start of data, this will be included in response | |
:param max_read_time: | |
:returns the read data or None | |
""" | |
logger.debug("start to read data from serial device") | |
response = bytes() | |
starttime = time.time() | |
try: | |
while True: | |
ch = the_serial.read() | |
#logger.debug(f"Read {ch}") | |
runtime = time.time() | |
if len(ch) == 0: | |
break | |
if start_byte is not None: | |
if ch == start_byte: | |
response = bytes() | |
response += ch | |
if ch == end_byte: | |
break | |
if (response[-1] == end_byte): | |
break | |
if max_read_time is not None: | |
if runtime-starttime > max_read_time: | |
break | |
except Exception as e: | |
logger.debug(f"Exception {e} occurred in read data block from serial") | |
return None | |
logger.debug(f"finished reading data from serial device after {len(response)} bytes") | |
return response | |
def query( config ): | |
""" | |
This function will | |
1. open a serial communication line to the smartmeter | |
2. sends a request for info | |
3. parses the devices first (and maybe second) answer for capabilities of the device | |
4. adjusts the speed of the communication accordingly | |
5. reads out the block of OBIS information | |
6. closes the serial communication | |
config contains a dict with entries for | |
'serialport', 'device', 'timeout', 'use_checksum', 'reset_baudrate', 'no_waiting', 'onlylisten', 'baudrate_fix' | |
return: a textblock with the data response from smartmeter | |
""" | |
# for the performance of the serial read we need to save the actual time | |
starttime = time.time() | |
runtime = starttime | |
result = None | |
SerialPort = config.get('serialport') | |
Device = config.get('device','') | |
InitialBaudrate = config.get('speed', 300) | |
QueryCode = config.get('querycode', '?') | |
use_checksum = config.get('use_checksum', True) | |
baudrate_fix = config.get('baudrate_fix', False) | |
timeout = config.get('timeout', 3) | |
OnlyListen = config.get('onlylisten', False) # just for the case that smartmeter transmits data without a query first | |
logger.debug(f"Config='{config}'") | |
StartChar = b'/'[0] | |
Request_Message = b"/"+QueryCode.encode('ascii')+Device.encode('ascii')+b"!\r\n" | |
# open the serial communication | |
# about timeout: time tr between sending a request and an answer needs to be | |
# 200ms < tr < 1500ms for protocol mode A or B | |
# inter character time must be smaller than 1500 ms | |
# The time between the reception of a message and the transmission of an answer is: | |
# (20 ms) 200 ms = tr = 1 500 ms (see item 12) of 6.3.14). | |
# If a response has not been received, the waiting time of the transmitting equipment after | |
# transmission of the identification message, before it continues with the transmission, is: | |
# 1 500 ms < tt = 2 200 ms | |
# The time between two characters in a character sequence is: | |
# ta < 1 500 ms | |
wait_before_acknowledge = 0.4 # wait for 400 ms before sending the request to change baudrate | |
wait_after_acknowledge = 0.4 # wait for 400 ms after sending acknowledge | |
dlms_serial = None | |
try: | |
dlms_serial = serial.Serial(SerialPort, | |
InitialBaudrate, | |
bytesize=serial.SEVENBITS, | |
parity=serial.PARITY_EVEN, | |
stopbits=serial.STOPBITS_ONE, | |
timeout=timeout) | |
if not SerialPort == dlms_serial.name: | |
logger.debug(f"Asked for {SerialPort} as serial port, but really using now {dlms_serial.name}") | |
except FileNotFoundError as e: | |
logger.error(f"Serial port '{SerialPort}' does not exist, please check your port") | |
return | |
except OSError as e: | |
logger.error(f"Serial port '{SerialPort}' does not exist, please check the spelling") | |
return | |
except serial.SerialException as e: | |
if dlms_serial is None: | |
logger.error(f"Serial port '{SerialPort}' could not be opened") | |
else: | |
logger.error(f"Serial port '{SerialPort}' could be opened but somehow not accessed") | |
except Exception as e: | |
logger.error(f"Another unknown error occurred: '{e}'") | |
return | |
if not dlms_serial.isOpen(): | |
logger.error(f"Serial port '{SerialPort}' could not be opened with given parameters, maybe wrong baudrate?") | |
return | |
logger.debug(f"Time to open serial port {SerialPort}: {format_time(time.time()- runtime)}") | |
runtime = time.time() | |
Acknowledge = b'' # preset empty answer | |
if not OnlyListen: | |
# start a dialog with smartmeter | |
try: | |
#logger.debug(f"Reset input buffer from serial port '{SerialPort}'") | |
#dlms_serial.reset_input_buffer() # replaced dlms_serial.flushInput() | |
logger.debug(f"Writing request message {Request_Message} to serial port '{SerialPort}'") | |
dlms_serial.write(Request_Message) | |
#logger.debug(f"Flushing buffer from serial port '{SerialPort}'") | |
#dlms_serial.flush() # replaced dlms_serial.drainOutput() | |
except Exception as e: | |
logger.warning(f"Error {e}") | |
return | |
logger.debug(f"Time to send first request to smartmeter: {format_time(time.time()- runtime)}") | |
# now get first response | |
response = read_data_block_from_serial(dlms_serial) | |
if response is None: | |
logger.debug("No response received upon first request") | |
return | |
logger.debug(f"Time to receive an answer: {format_time(time.time()- runtime)}") | |
runtime = time.time() | |
# We need to examine the read response here for an echo of the _Request_Message | |
# some meters answer with an echo of the request Message | |
if response == Request_Message: | |
logger.debug("Request Message was echoed, need to read the identification message") | |
# read Identification message if Request was echoed | |
# now read the capabilities and type/brand line from Smartmeter | |
# e.g. b'/LGZ5\\2ZMD3104407.B32\r\n' | |
response = read_data_block_from_serial(dlms_serial) | |
else: | |
logger.debug("Request Message was not equal to response, treating as identification message") | |
logger.debug(f"Time to get first identification message from smartmeter: {format_time(time.time() - runtime)}") | |
runtime = time.time() | |
Identification_Message = response | |
logger.debug(f"Identification Message is {Identification_Message}") | |
# need at least 7 bytes: | |
# 1 byte "/" | |
# 3 bytes short Identification | |
# 1 byte speed indication | |
# 2 bytes CR LF | |
if (len(Identification_Message) < 7): | |
logger.warning(f"malformed identification message: '{Identification_Message}', abort query") | |
return | |
if (Identification_Message[0] != StartChar): | |
logger.warning(f"identification message '{Identification_Message}' does not start with '/', abort query") | |
return | |
manid = str(Identification_Message[1:4],'utf-8') | |
manname = manufacturer_ids.get(manid,'unknown') | |
logger.debug(f"The manufacturer for {manid} is {manname} (out of {len(manufacturer_ids)} given manufacturers)") | |
""" | |
Different smartmeters allow for different protocol modes. | |
The protocol mode decides whether the communication is fixed to a certain baudrate or might be speed up. | |
Some meters do initiate a protocol by themselves with a fixed speed of 2400 baud e.g. Mode D | |
However some meters specify a speed of 9600 Baud although they use protocol mode D (readonly) | |
""" | |
Protocol_Mode = 'A' | |
""" | |
The communication of the plugin always stays at the same speed, | |
Protocol indicator can be anything except for A-I, 0-9, /, ? | |
""" | |
Baudrates_Protocol_Mode_A = 300 | |
Baudrates_Protocol_Mode_B = { 'A': 600, 'B': 1200, 'C': 2400, 'D': 4800, 'E': 9600, 'F': 19200, | |
'G': "reserved", 'H': "reserved", 'I': "reserved" } | |
Baudrates_Protocol_Mode_C = { '0': 300, '1': 600, '2': 1200, '3': 2400, '4': 4800, '5': 9600, '6': 19200, | |
'7': "reserved", '8': "reserved", '9': "reserved"} | |
# always '3' but it is always initiated by the metering device so it can't be encountered here | |
Baudrates_Protocol_Mode_D = { '3' : 2400} | |
Baudrates_Protocol_Mode_E = Baudrates_Protocol_Mode_C | |
Baudrate_identification = chr(Identification_Message[4]) | |
if Baudrate_identification in Baudrates_Protocol_Mode_B: | |
NewBaudrate = Baudrates_Protocol_Mode_B[Baudrate_identification] | |
Protocol_Mode = 'B' | |
elif Baudrate_identification in Baudrates_Protocol_Mode_C: | |
NewBaudrate = Baudrates_Protocol_Mode_C[Baudrate_identification] | |
Protocol_Mode = 'C' # could also be 'E' but it doesn't make any difference here | |
else: | |
NewBaudrate = Baudrates_Protocol_Mode_A | |
Protocol_Mode = 'A' | |
logger.debug(f"Baudrate id is '{Baudrate_identification}' thus Protocol Mode is {Protocol_Mode} and suggested Baudrate is {NewBaudrate} Bd") | |
if chr(Identification_Message[5]) == '\\': | |
if chr(Identification_Message[6]) == '2': | |
logger.debug("HDLC protocol could be used if it was implemented") | |
else: | |
logger.debug("Another protocol could probably be used if it was implemented") | |
# for protocol C or E we now send an acknowledge and include the new baudrate parameter | |
# maybe todo | |
# we could implement here a baudrate that is fixed to somewhat lower speed if we need to | |
# read out a smartmeter with broken communication | |
Action = b'0' # Data readout, possible are also b'1' for programming mode or some manufacturer specific | |
Acknowledge = b'\x060'+ Baudrate_identification.encode() + Action + b'\r\n' | |
if Protocol_Mode == 'C': | |
# the speed change in communication is initiated from the reading device | |
time.sleep(wait_before_acknowledge) | |
logger.debug(f"Using protocol mode C, send acknowledge {Acknowledge} and tell smartmeter to switch to {NewBaudrate} Baud") | |
try: | |
dlms_serial.write( Acknowledge ) | |
except Exception as e: | |
logger.warning(f"Warning {e}") | |
return | |
time.sleep(wait_after_acknowledge) | |
#dlms_serial.flush() | |
#dlms_serial.reset_input_buffer() | |
if (NewBaudrate != InitialBaudrate): | |
# change request to set higher baudrate | |
dlms_serial.baudrate = NewBaudrate | |
elif Protocol_Mode == 'B': | |
# the speed change in communication is initiated from the smartmeter device | |
time.sleep(wait_before_acknowledge) | |
logger.debug(f"Using protocol mode B, smartmeter and reader will switch to {NewBaudrate} Baud") | |
time.sleep(wait_after_acknowledge) | |
#dlms_serial.flush() | |
#dlms_serial.reset_input_buffer() | |
if (NewBaudrate != InitialBaudrate): | |
# change request to set higher baudrate | |
dlms_serial.baudrate = NewBaudrate | |
else: | |
logger.debug(f"No change of readout baudrate, " | |
"smartmeter and reader will stay at {NewBaudrate} Baud") | |
# now read the huge data block with all the OBIS codes | |
logger.debug("Reading OBIS data from smartmeter") | |
response = read_data_block_from_serial(dlms_serial, None) | |
else: | |
# only listen mode | |
response = read_data_block_from_serial(dlms_serial, b'!', b'/') | |
Identification_Message = str(response,'utf-8').partition()[0] | |
manid = Identification_Message[1:4] | |
manname = manufacturer_ids.get(manid,'unknown') | |
logger.debug(f"The manufacturer for {manid} is {manname} (out of {len(manufacturer_ids)} given manufacturers)") | |
dlms_serial.close() | |
logger.debug(f"Time for reading OBIS data: {format_time(time.time()- runtime)}") | |
runtime = time.time() | |
# Display performance of the serial communication | |
logger.debug(f"Whole communication with smartmeter took {format_time(time.time() - starttime)}") | |
if response.startswith(Acknowledge): | |
logger.debug("Acknowledge echoed from smartmeter") | |
response = response[len(Acknowledge):] | |
if use_checksum: | |
# data block in response may be capsuled within STX and ETX to provide error checking | |
# thus the response will contain a sequence of | |
# STX Datablock ! CR LF ETX BCC | |
# which means we need at least 6 characters in response where Datablock is empty | |
logger.debug("trying now to calculate a checksum") | |
if response[0] == STX: | |
logger.debug("STX found") | |
else: | |
logger.warning(f"STX not found in response='{' '.join(hex(i) for i in response[:10])}...'") | |
if response[-2] == ETX: | |
logger.debug("ETX found") | |
else: | |
logger.warning(f"ETX not found in response='...{' '.join(hex(i) for i in response[-11])}'") | |
if (len(response) > 5) and (response[0] == STX) and (response[-2] == ETX): | |
# perform checks (start with char after STX, end with ETX including, checksum matches last byte (BCC)) | |
BCC = response[-1] | |
logger.debug(f"block check character BCC is {BCC}") | |
checksum = 0 | |
for i in response[1:-1]: | |
checksum ^= i | |
if checksum != BCC: | |
logger.warning(f"checksum/protocol error: response={' '.join(hex(i) for i in response[1:-1])} " | |
"checksum={checksum}") | |
return | |
else: | |
logger.debug("checksum over data response was ok, data is valid") | |
else: | |
logger.warning("STX - ETX not found") | |
else: | |
logger.debug("checksum calculation skipped") | |
if len(response) > 5: | |
result = str(response[1:-4], 'ascii') | |
logger.debug(f"parsing OBIS codes took {format_time(time.time()- runtime)}") | |
else: | |
logger.debug("Sorry response did not contain enough data for OBIS decode") | |
suggested_cycle = (time.time() - starttime) + 10.0 | |
config['suggested_cycle'] = suggested_cycle | |
logger.debug(f"the whole query took {format_time(time.time()- starttime)}, suggested cycle thus is at least {format_time(suggested_cycle)}") | |
return result | |
if __name__ == '__main__': | |
import sys | |
import argparse | |
parser = argparse.ArgumentParser(description='Query a smartmeter at a given port for DLMS output', | |
usage='use "%(prog)s --help" for more information', | |
formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument('port', help='specify the port to use for the smartmeter query, e.g. /dev/ttyUSB0 or /dev/dlms0') | |
parser.add_argument('-v', '--verbose', help='print verbose information', action='store_true') | |
parser.add_argument('-t', '--timeout', help='maximum time to wait for a message from the smartmeter', type=float, default=3.0 ) | |
parser.add_argument('-s', '--speed', help='initial baudrate to start the communication with the smartmeter', type=int, default=300 ) | |
parser.add_argument('-d', '--device', help='give a device address to include in the query', default='' ) | |
parser.add_argument('-q', '--querycode', help='define alternative query code\ndefault query code is ?\nsome smartmeters provide additional information when sending\nan alternative query code, e.g. 2 instead of ?', default='?' ) | |
parser.add_argument('-l', '--onlylisten', help='Only listen to serial, no active query', action='store_true' ) | |
parser.add_argument('-f', '--baudrate_fix', help='Keep baudrate speed fixed', action='store_false' ) | |
parser.add_argument('-c', '--nochecksum', help='use a checksum', action='store_false' ) | |
args = parser.parse_args() | |
config = {} | |
config['serialport'] = args.port | |
config['device'] = args.device | |
config['querycode'] = args.querycode | |
config['speed'] = args.speed | |
config['baudrate_fix'] = args.baudrate_fix | |
config['timeout'] = args.timeout | |
config['onlylisten'] = args.onlylisten | |
config['use_checksum'] = args.nochecksum | |
if args.verbose: | |
logging.getLogger().setLevel( logging.DEBUG ) | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
# create formatter and add it to the handlers | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s @ %(lineno)d') | |
#formatter = logging.Formatter('%(message)s') | |
ch.setFormatter(formatter) | |
# add the handlers to the logger | |
logging.getLogger().addHandler(ch) | |
else: | |
logging.getLogger().setLevel( logging.DEBUG ) | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
# just like print | |
formatter = logging.Formatter('%(message)s') | |
ch.setFormatter(formatter) | |
# add the handlers to the logger | |
logging.getLogger().addHandler(ch) | |
logger.info("This is DLMS Plugin running in standalone mode") | |
logger.info("==============================================") | |
result = query(config) | |
if result is None: | |
logger.info(f"No results from query, maybe a problem with the serial port '{config['serialport']}' given ") | |
logger.info("==============================================") | |
elif len(result) > 0: | |
logger.info("These are the results of the query") | |
logger.info("==============================================") | |
logger.info(result) | |
logger.info("==============================================") | |
else: | |
logger.info("The query did not get any results!") | |
logger.info("Maybe the serial was occupied or there was an error") | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab | |
######################################################################### | |
# Copyright 2016 - 2021 Bernd Meiners [email protected] | |
######################################################################### | |
# | |
# DLMS plugin for SmartHomeNG | |
# | |
# This file is part of SmartHomeNG.py. | |
# Visit: https://github.com/smarthomeNG/ | |
# https://knx-user-forum.de/forum/supportforen/smarthome-py | |
# https://smarthomeng.de | |
# | |
# SmartHomeNG.py is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# SmartHomeNG.py is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with SmartHomeNG.py. If not, see <http://www.gnu.org/licenses/>. | |
######################################################################### | |
""" | |
On standalone mode this Python program will visit | |
https://www.dlms.com/srv/lib/Export_Flagids.php | |
to download the list of manufacturer ids for smartmeter as xlsx | |
The columns therein are labeled as "FLAG ID","Manufacturer", "Country", and "World Region" | |
``FLAG ID`` is exactly 3 characters long | |
The result will be stored locally as ``manufacturer.yaml`` | |
to serve as information database for the identification of smartmeters | |
""" | |
import logging | |
import requests | |
import sys | |
from ruamel.yaml import YAML | |
from io import BytesIO | |
install_openpyxl = "python3 -m pip install --user openpyxl" | |
if __name__ == '__main__': | |
logger = logging.getLogger(__name__) | |
logger.debug(f"init standalone {__name__}") | |
else: | |
logger = logging.getLogger() | |
logger.debug(f"init plugin component {__name__}") | |
try: | |
import openpyxl | |
except: | |
sys.exit(f"Package 'openpyxl' was not found. You might install with {install_openpyxl}") | |
def get_manufacturer( from_url, to_yaml, verbose = False ): | |
""" | |
Read XLSX from given url and write a yaml containing id and manufacturer | |
""" | |
# result | |
r = {} | |
y = YAML() | |
logger.debug(f"Read manufacturer IDs from URL: '{url}'") | |
logger.debug(f"Using openpyxl version '{openpyxl.__version__}'") | |
headers = {'User-agent': 'Mozilla/5.0'} | |
try: | |
reque = requests.get(url, headers=headers) | |
except ConnectionError as e: | |
logger.debug(f"An error {e} occurred fetching {url}\n") | |
raise | |
try: | |
wb = openpyxl.load_workbook(filename=BytesIO(reque.content), data_only=True) | |
#wb = openpyxl.load_workbook(xlfilename, data_only=True) | |
logger.debug('sheetnames {}'.format(wb.sheetnames)) | |
sheet = wb.active | |
logger.debug(f"sheet {sheet}") | |
logger.debug(f"rows [{sheet.min_row} .. {sheet.max_row}]") | |
logger.debug(f"columns [{sheet.min_column} .. {sheet.max_column}]") | |
if sheet.min_row+1 <= sheet.max_row and sheet.min_column == 1 and sheet.max_column == 4: | |
# Get data from rows """ | |
for row in range(sheet.min_row+1,sheet.max_row): | |
id = str(sheet.cell(row, 1).value).strip() | |
if len(id) == 3: | |
# there are entries like > 'ITRON ...' < that need special cleaning: | |
man = str(sheet.cell(row, 2).value).strip() | |
man = man.strip('\'').strip() | |
r[id] = man | |
if verbose: | |
logger.debug(f"{id}->{man}") | |
else: | |
logger.debug(f">id< is '{id}' has more than 3 characters and will not be considered") | |
with open(exportfile, 'w') as f: | |
y.dump( r, f ) | |
logger.debug(f"{len(r)} distinct manufacturers were found and written to {exportfile}") | |
except Exception as e: | |
logger.debug(f"Error {e} occurred") | |
return r | |
if __name__ == '__main__': | |
verbose = True | |
logging.getLogger().setLevel( logging.DEBUG ) | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
# create formatter and add it to the handlers | |
if verbose: | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s @ %(lineno)d') | |
else: | |
formatter = logging.Formatter('%(message)s') | |
ch.setFormatter(formatter) | |
# add the handlers to the logger | |
logging.getLogger().addHandler(ch) | |
logger = logging.getLogger(__name__) | |
exportfile = 'manufacturer.yaml' | |
url = 'https://www.dlms.com/srv/lib/Export_Flagids.php' | |
get_manufacturer( url, exportfile, verbose) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment