Skip to content

Instantly share code, notes, and snippets.

@Cougar
Created May 17, 2020 20:29
Show Gist options
  • Save Cougar/450fb64e48710a860bcb0bed3ca666ae to your computer and use it in GitHub Desktop.
Save Cougar/450fb64e48710a860bcb0bed3ca666ae to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import functools
from tornado.ioloop import IOLoop
from pymodbus.client.asynchronous import schedulers
from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient
#from pymodbus.client.asynchronous.serial import AsyncModbusSerialClient
from pymodbus120_async.client import AsyncModbusSerialClient
from pymodbus120.exceptions import ConnectionException, ModbusIOException
from pymodbus120.register_read_message import ReadHoldingRegistersResponse
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
UNIT = 0x01
class ModBusRunner:
def __init__(self, ModbusClient, register_list, timeout=10, **kwargs):
self._client = None
self._protocol = None
self._register = self._get_next_register(register_list)
self._timeout = timeout
self._timeout_callback = None
self._protocol, future = ModbusClient(schedulers.IO_LOOP, **kwargs)
future.add_done_callback(self._on_connect)
def run(self):
self._send_request()
def _get_next_register(self, register_list):
while True:
for register in register_list:
yield register
def _send_request(self):
register = next(self._register)
log.debug('_send_request(%d)', register)
self._timeout_callback = IOLoop.current().call_later(self._timeout, self._on_timeout)
log.debug('+++ timeout set: %s', self._timeout_callback)
try:
self._client.read_holding_registers(register, 1, unit=UNIT).add_done_callback(functools.partial(self._on_done, register))
except tornado.iostream.StreamClosedError as ex:
log.error(ex)
def _on_connect(self, future):
log.debug('_on_connect()')
log.info("Client connected")
exp = future.exception()
if exp:
log.error("ERROR: %s", exp)
else:
self._client = future.result()
self.run()
def _on_done(self, register, f):
log.debug('_on_done(%d)', register)
exc = f.exception()
if exc:
log.debug(exc)
else:
self._print(register, f.result())
log.debug('--- remove_timeout()')
IOLoop.current().remove_timeout(self._timeout_callback)
self._send_request()
def _on_timeout(self):
log.debug('!!! _timeout()')
log.error("TIMEOUT reading registers")
log.debug("self._client: %s", self._client)
log.debug("self._protocol: %s", self._protocol)
self._send_request()
def _print(self, register, value):
if hasattr(value, "bits"):
t = value.bits
elif hasattr(value, "registers"):
t = value.registers
else:
log.error(value)
return
log.info("Printing %d: -- {}".format(t), register)
def __del__(self):
log.debug('__del__()')
if self._client:
self._client.close()
if self._protocol:
self._protocol.stop()
class ModBusRunner120:
def __init__(self, ModbusClient, register_list, timeout=10, **kwargs):
self._client = None
self._register = self._get_next_register(register_list)
self._timeout = timeout
self._timeout_callback = None
self._client = ModbusClient(**kwargs)
self.run()
def run(self):
self._send_request()
def _get_next_register(self, register_list):
while True:
for register in register_list:
yield register
def _send_request(self):
register = next(self._register)
log.debug('_send_request(%d)', register)
self._timeout_callback = IOLoop.current().call_later(self._timeout, self._on_timeout)
log.debug('+++ timeout set: %s', self._timeout_callback)
try:
self._client.read_holding_registers(register, 1, unit=UNIT).addCallback(functools.partial(self._on_done, register))
except tornado.iostream.StreamClosedError as ex:
log.error(ex)
def _on_done(self, register, f):
log.debug('_on_done(%d)', register)
if isinstance(f, ReadHoldingRegistersResponse):
self._print(register, f.registers)
else:
log.error("ERROR: %s", f)
log.debug('--- remove_timeout()')
IOLoop.current().remove_timeout(self._timeout_callback)
self._send_request()
def _on_timeout(self):
log.debug('!!! _timeout()')
log.error("TIMEOUT reading registers")
log.debug("self._client: %s", self._client)
self._send_request()
def _print(self, register, value):
if hasattr(value, "bits"):
t = value.bits
elif hasattr(value, "registers"):
t = value.registers
else:
log.error(value)
return
log.info("Printing %d: -- {}".format(t), register)
def __del__(self):
log.debug('__del__()')
if self._client:
self._client.close()
if __name__ == "__main__":
# IOLoop.configure('tornado.platform.epoll.EPollIOLoop')
ModBusRunner(AsyncModbusTCPClient, [8, 10], host='127.0.0.1', port=5020)
ModBusRunner(AsyncModbusTCPClient, [149, 150], host='192.168.1.151', port=502)
ModBusRunner120(AsyncModbusSerialClient, [149, 150], method='rtu', port='/dev/ttyUSB2', baudrate=19200, parity='E')
IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment