Created
April 12, 2024 01:57
-
-
Save ASolchen/7608a0fb4e32cfac1894644d893d21e6 to your computer and use it in GitHub Desktop.
Kivy UI for testing modbus TCP clients
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
#pip install kivy and pyModbusTCP | |
__version__ = '1.0' | |
import kivy | |
kivy.require('2.2.0') | |
from kivy.app import App | |
from kivy.clock import Clock | |
from kivy.uix.floatlayout import FloatLayout | |
from kivy.uix.relativelayout import RelativeLayout | |
from kivy.uix.gridlayout import GridLayout | |
from kivy.uix.boxlayout import BoxLayout | |
from kivy.uix.textinput import TextInput | |
from kivy.uix.label import Label | |
from kivy.uix.button import Button | |
from kivy.uix.slider import Slider | |
from kivy.properties import BooleanProperty, NumericProperty | |
from pyModbusTCP.server import ModbusServer, DataBank | |
from threading import Thread | |
from functools import partial | |
FUNC_COIL = 0 | |
FUNC_DI = 1 | |
FUNC_HR = 3 | |
FUNC_IR = 4 | |
class SimDataBank(DataBank): | |
"""A custom ModbusServerDataBank for override on_xxx_change methods.""" | |
def on_coils_change(self, address, from_value, to_value, srv_info): | |
"""Call by server when change occur on coils space.""" | |
msg = 'change in coil space [{0!r:^5} > {1!r:^5}] at @ 0x{2:04X} from ip: {3:<15}' | |
msg = msg.format(from_value, to_value, address, srv_info.client.address) | |
def on_holding_registers_change(self, address, from_value, to_value, srv_info): | |
"""Call by server when change occur on holding registers space.""" | |
msg = 'change in hreg space [{0!r:^5} > {1!r:^5}] at @ 0x{2:04X} from ip: {3:<15}' | |
msg = msg.format(from_value, to_value, address, srv_info.client.address) | |
class MbServer(): | |
def __init__(self, root) -> None: | |
self.root = root | |
self.server_thread = Thread(target=self.start_server_thread, daemon=True) | |
self.server = ModbusServer(host='0.0.0.0', port=502, data_bank=SimDataBank()) | |
def start_server_thread(self): | |
self.root.server_running = True | |
self.server.start() | |
def start_server(self): | |
if not self.server_thread.is_alive(): | |
self.server_thread.start() | |
def stop_server(self): | |
self.root.server_running = False | |
if self.server_thread.is_alive(): | |
self.server.stop() | |
self.server_thread.join() | |
self.server_thread = Thread(target=self.start_server_thread) #queue up a new thread | |
class Root(RelativeLayout): | |
server_running = BooleanProperty(False, allownone=False) | |
offset = NumericProperty(0) | |
def __init__(self, **kwargs): | |
super().__init__(**kwargs) | |
self.connected_status = False | |
self.mb_server = MbServer(self) | |
self.size = (640, 480) | |
self.func = FUNC_COIL | |
self.offset = 0 | |
self.w_dict = { #all root's widgets | |
"start-btn": Button(text="Start", size_hint=(.2,0.08), pos_hint={'x':.05, 'y':.85}), | |
"stop-btn": Button(text="Stop", size_hint=(.2,0.08), pos_hint={'x':.25, 'y':.85}, disabled=True,), | |
"stat-lbl": Label(text="Sever Stopped", size_hint=(.4,0.08), pos_hint={'x':.05, 'y':.92}), | |
"offset-slider": Slider(min=0, max=9900, step=100, value=self.offset, size_hint=(0.9,0.1), pos_hint={'x':.05, 'y':.65}), | |
"coil-select" : Button(text="Coils", size_hint=(.2,0.08), pos_hint={'x':.05, 'y':.75}), | |
"di-select" : Button(text="Input Stats", size_hint=(.2,0.08), pos_hint={'x':.25, 'y':.75}), | |
"ir-select" : Button(text="Input Regs", size_hint=(.2,0.08), pos_hint={'x':.45, 'y':.75}), | |
"hr-select" : Button(text="Holding Regs", size_hint=(.2,0.08), pos_hint={'x':.65, 'y':.75}), | |
"func-lbl" : Label(text="Coils", size_hint=(.2,0.08), pos_hint={'x':.35, 'y':.58}), | |
} | |
def update_offset(instance, value): | |
self.offset = int(value) | |
self.w_dict['offset-slider'].fbind('value', update_offset) | |
self.w_dict["coil-select"].on_press = self.set_func_coil | |
self.w_dict["di-select"].on_press = self.set_func_di | |
self.w_dict["ir-select"].on_press = self.set_func_ir | |
self.w_dict["hr-select"].on_press = self.set_func_hr | |
def set_stat(instances, value): | |
lbl = f"Server running on port {self.mb_server.server.port}" if value else "Sever Stopped" | |
self.w_dict["stat-lbl"].text = lbl | |
self.w_dict["start-btn"].disabled = bool(value) | |
self.w_dict["stop-btn"].disabled = not bool(value) | |
self.bind(server_running=set_stat) | |
self.w_dict["start-btn"].on_press = self.mb_server.start_server | |
self.w_dict["stop-btn"].on_press = self.mb_server.stop_server | |
# layout the widgets | |
[self.add_widget(self.w_dict[w]) for w in self.w_dict] | |
#value labels | |
self.val_labels_x = [] | |
for x in range(10): | |
w = Label(text=f"+{x}", size_hint=(.08,0.05), pos_hint={'x':0.1 + x*0.08, 'y':0.55}) | |
self.val_labels_x.append(w) | |
self.add_widget(w) | |
self.val_labels_y = [] | |
for y in range(10): | |
w = Label(text=f"{y*10}", size_hint=(.08,0.05), pos_hint={'x':0.02, 'y':0.5-y * 0.05}) | |
self.val_labels_y.append(w) | |
self.add_widget(w) | |
self.bind(offset=self.update_val_labels) | |
#values | |
def val_changed(idx, instance, focus): | |
if not focus: | |
addr = self.offset + idx | |
self.set_value(addr, instance.text) | |
self.value_widgets = [] | |
for y in range(10): | |
for x in range(10): | |
idx = x+y*10 | |
w = TextInput(text=f"--", size_hint=(.08,0.05), pos_hint={'x':0.1 + x*0.08, 'y':0.5-y * 0.05}) | |
w.bind(focus=partial(val_changed, idx)) | |
self.value_widgets.append(w) | |
self.add_widget(w) | |
#schedule updates | |
Clock.schedule_interval(self.update_vals, 0.5) | |
def update_vals(self, dt): | |
for btn in [self.w_dict["coil-select"], | |
self.w_dict["di-select"], | |
self.w_dict["ir-select"], | |
self.w_dict["hr-select"]]: | |
btn.background_color = [1, 1, 1, 1] | |
if not self.server_running: | |
vals = ["--"]*100 | |
else: | |
if self.func == FUNC_COIL: | |
self.w_dict["coil-select"].background_color = [1, 0, 0, 1] | |
vals = [int(bit) for bit in self.mb_server.server.data_bank.get_coils(self.offset, 100)] | |
if self.func == FUNC_DI: | |
self.w_dict["di-select"].background_color = [1, 0, 0, 1] | |
vals = [int(bit) for bit in self.mb_server.server.data_bank.get_discrete_inputs(self.offset, 100)] | |
if self.func == FUNC_HR: | |
self.w_dict["hr-select"].background_color = [1, 0, 0, 1] | |
vals = self.mb_server.server.data_bank.get_holding_registers(self.offset, 100) | |
if self.func == FUNC_IR: | |
self.w_dict["ir-select"].background_color = [1, 0, 0, 1] | |
vals = self.mb_server.server.data_bank.get_input_registers(self.offset, 100) | |
for idx, val in enumerate(vals): | |
if not self.value_widgets[idx].focus: | |
self.value_widgets[idx].text = str(val) | |
def update_val_labels(self, instance, value): | |
for x in range(10): | |
self.val_labels_x[x].text = f"+{x}" | |
for y in range(10): | |
self.val_labels_y[y].text = f"{int(value + y*10)}" | |
def set_func_coil(self, *args): | |
self.func = FUNC_COIL | |
self.w_dict["func-lbl"].text = "Coils" | |
def set_func_di(self, *args): | |
self.func = FUNC_DI | |
self.w_dict["func-lbl"].text = "Discrete Status Registers" | |
def set_func_ir(self, *args): | |
self.func = FUNC_IR | |
self.w_dict["func-lbl"].text = "Input Registers" | |
def set_func_hr(self, *args): | |
self.func = FUNC_HR | |
self.w_dict["func-lbl"].text = "Holding Registers" | |
def set_value(self, addr, text): | |
"""text comes in from entry. Parse to see if valid for function and number types""" | |
if not self.server_running: | |
return | |
else: | |
if self.func == FUNC_COIL: | |
try: | |
val = bool(int(text)) | |
self.mb_server.server.data_bank.set_coils(addr, [val]) | |
except: | |
ValueError() | |
if self.func == FUNC_DI: | |
try: | |
val = bool(int(text)) | |
self.mb_server.server.data_bank.set_discrete_inputs(addr, [val]) | |
except: | |
ValueError() | |
if self.func == FUNC_HR: | |
try: | |
val = max(0, min(2**16, int(text))) | |
self.mb_server.server.data_bank.set_holding_registers(addr, [val]) | |
except: | |
ValueError() | |
if self.func == FUNC_IR: | |
try: | |
val = max(0, min(2**16, int(text))) | |
self.mb_server.server.data_bank.set_input_registers(addr, [val]) | |
except: | |
ValueError() | |
class ModsimApp(App): | |
title = 'Py Mod Sim' | |
icon = 'icon.png' | |
def build(self): | |
return Root() | |
if __name__ == '__main__': | |
ModsimApp().run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment