Skip to content

Instantly share code, notes, and snippets.

@ASolchen
Created April 12, 2024 01:57
Show Gist options
  • Save ASolchen/7608a0fb4e32cfac1894644d893d21e6 to your computer and use it in GitHub Desktop.
Save ASolchen/7608a0fb4e32cfac1894644d893d21e6 to your computer and use it in GitHub Desktop.
Kivy UI for testing modbus TCP clients
#!/usr/bin/env python3
#pip install kivy and pyModbusTCP
__version__ = '1.0'
import kivy
kivy.require('2.2.0')
from kivy.app import App
from kivy.clock import Clock
from kivy.uix.floatlayout import FloatLayout
from kivy.uix.relativelayout import RelativeLayout
from kivy.uix.gridlayout import GridLayout
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.textinput import TextInput
from kivy.uix.label import Label
from kivy.uix.button import Button
from kivy.uix.slider import Slider
from kivy.properties import BooleanProperty, NumericProperty
from pyModbusTCP.server import ModbusServer, DataBank
from threading import Thread
from functools import partial
FUNC_COIL = 0
FUNC_DI = 1
FUNC_HR = 3
FUNC_IR = 4
class SimDataBank(DataBank):
"""A custom ModbusServerDataBank for override on_xxx_change methods."""
def on_coils_change(self, address, from_value, to_value, srv_info):
"""Call by server when change occur on coils space."""
msg = 'change in coil space [{0!r:^5} > {1!r:^5}] at @ 0x{2:04X} from ip: {3:<15}'
msg = msg.format(from_value, to_value, address, srv_info.client.address)
def on_holding_registers_change(self, address, from_value, to_value, srv_info):
"""Call by server when change occur on holding registers space."""
msg = 'change in hreg space [{0!r:^5} > {1!r:^5}] at @ 0x{2:04X} from ip: {3:<15}'
msg = msg.format(from_value, to_value, address, srv_info.client.address)
class MbServer():
def __init__(self, root) -> None:
self.root = root
self.server_thread = Thread(target=self.start_server_thread, daemon=True)
self.server = ModbusServer(host='0.0.0.0', port=502, data_bank=SimDataBank())
def start_server_thread(self):
self.root.server_running = True
self.server.start()
def start_server(self):
if not self.server_thread.is_alive():
self.server_thread.start()
def stop_server(self):
self.root.server_running = False
if self.server_thread.is_alive():
self.server.stop()
self.server_thread.join()
self.server_thread = Thread(target=self.start_server_thread) #queue up a new thread
class Root(RelativeLayout):
server_running = BooleanProperty(False, allownone=False)
offset = NumericProperty(0)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.connected_status = False
self.mb_server = MbServer(self)
self.size = (640, 480)
self.func = FUNC_COIL
self.offset = 0
self.w_dict = { #all root's widgets
"start-btn": Button(text="Start", size_hint=(.2,0.08), pos_hint={'x':.05, 'y':.85}),
"stop-btn": Button(text="Stop", size_hint=(.2,0.08), pos_hint={'x':.25, 'y':.85}, disabled=True,),
"stat-lbl": Label(text="Sever Stopped", size_hint=(.4,0.08), pos_hint={'x':.05, 'y':.92}),
"offset-slider": Slider(min=0, max=9900, step=100, value=self.offset, size_hint=(0.9,0.1), pos_hint={'x':.05, 'y':.65}),
"coil-select" : Button(text="Coils", size_hint=(.2,0.08), pos_hint={'x':.05, 'y':.75}),
"di-select" : Button(text="Input Stats", size_hint=(.2,0.08), pos_hint={'x':.25, 'y':.75}),
"ir-select" : Button(text="Input Regs", size_hint=(.2,0.08), pos_hint={'x':.45, 'y':.75}),
"hr-select" : Button(text="Holding Regs", size_hint=(.2,0.08), pos_hint={'x':.65, 'y':.75}),
"func-lbl" : Label(text="Coils", size_hint=(.2,0.08), pos_hint={'x':.35, 'y':.58}),
}
def update_offset(instance, value):
self.offset = int(value)
self.w_dict['offset-slider'].fbind('value', update_offset)
self.w_dict["coil-select"].on_press = self.set_func_coil
self.w_dict["di-select"].on_press = self.set_func_di
self.w_dict["ir-select"].on_press = self.set_func_ir
self.w_dict["hr-select"].on_press = self.set_func_hr
def set_stat(instances, value):
lbl = f"Server running on port {self.mb_server.server.port}" if value else "Sever Stopped"
self.w_dict["stat-lbl"].text = lbl
self.w_dict["start-btn"].disabled = bool(value)
self.w_dict["stop-btn"].disabled = not bool(value)
self.bind(server_running=set_stat)
self.w_dict["start-btn"].on_press = self.mb_server.start_server
self.w_dict["stop-btn"].on_press = self.mb_server.stop_server
# layout the widgets
[self.add_widget(self.w_dict[w]) for w in self.w_dict]
#value labels
self.val_labels_x = []
for x in range(10):
w = Label(text=f"+{x}", size_hint=(.08,0.05), pos_hint={'x':0.1 + x*0.08, 'y':0.55})
self.val_labels_x.append(w)
self.add_widget(w)
self.val_labels_y = []
for y in range(10):
w = Label(text=f"{y*10}", size_hint=(.08,0.05), pos_hint={'x':0.02, 'y':0.5-y * 0.05})
self.val_labels_y.append(w)
self.add_widget(w)
self.bind(offset=self.update_val_labels)
#values
def val_changed(idx, instance, focus):
if not focus:
addr = self.offset + idx
self.set_value(addr, instance.text)
self.value_widgets = []
for y in range(10):
for x in range(10):
idx = x+y*10
w = TextInput(text=f"--", size_hint=(.08,0.05), pos_hint={'x':0.1 + x*0.08, 'y':0.5-y * 0.05})
w.bind(focus=partial(val_changed, idx))
self.value_widgets.append(w)
self.add_widget(w)
#schedule updates
Clock.schedule_interval(self.update_vals, 0.5)
def update_vals(self, dt):
for btn in [self.w_dict["coil-select"],
self.w_dict["di-select"],
self.w_dict["ir-select"],
self.w_dict["hr-select"]]:
btn.background_color = [1, 1, 1, 1]
if not self.server_running:
vals = ["--"]*100
else:
if self.func == FUNC_COIL:
self.w_dict["coil-select"].background_color = [1, 0, 0, 1]
vals = [int(bit) for bit in self.mb_server.server.data_bank.get_coils(self.offset, 100)]
if self.func == FUNC_DI:
self.w_dict["di-select"].background_color = [1, 0, 0, 1]
vals = [int(bit) for bit in self.mb_server.server.data_bank.get_discrete_inputs(self.offset, 100)]
if self.func == FUNC_HR:
self.w_dict["hr-select"].background_color = [1, 0, 0, 1]
vals = self.mb_server.server.data_bank.get_holding_registers(self.offset, 100)
if self.func == FUNC_IR:
self.w_dict["ir-select"].background_color = [1, 0, 0, 1]
vals = self.mb_server.server.data_bank.get_input_registers(self.offset, 100)
for idx, val in enumerate(vals):
if not self.value_widgets[idx].focus:
self.value_widgets[idx].text = str(val)
def update_val_labels(self, instance, value):
for x in range(10):
self.val_labels_x[x].text = f"+{x}"
for y in range(10):
self.val_labels_y[y].text = f"{int(value + y*10)}"
def set_func_coil(self, *args):
self.func = FUNC_COIL
self.w_dict["func-lbl"].text = "Coils"
def set_func_di(self, *args):
self.func = FUNC_DI
self.w_dict["func-lbl"].text = "Discrete Status Registers"
def set_func_ir(self, *args):
self.func = FUNC_IR
self.w_dict["func-lbl"].text = "Input Registers"
def set_func_hr(self, *args):
self.func = FUNC_HR
self.w_dict["func-lbl"].text = "Holding Registers"
def set_value(self, addr, text):
"""text comes in from entry. Parse to see if valid for function and number types"""
if not self.server_running:
return
else:
if self.func == FUNC_COIL:
try:
val = bool(int(text))
self.mb_server.server.data_bank.set_coils(addr, [val])
except:
ValueError()
if self.func == FUNC_DI:
try:
val = bool(int(text))
self.mb_server.server.data_bank.set_discrete_inputs(addr, [val])
except:
ValueError()
if self.func == FUNC_HR:
try:
val = max(0, min(2**16, int(text)))
self.mb_server.server.data_bank.set_holding_registers(addr, [val])
except:
ValueError()
if self.func == FUNC_IR:
try:
val = max(0, min(2**16, int(text)))
self.mb_server.server.data_bank.set_input_registers(addr, [val])
except:
ValueError()
class ModsimApp(App):
title = 'Py Mod Sim'
icon = 'icon.png'
def build(self):
return Root()
if __name__ == '__main__':
ModsimApp().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment