Last active
March 25, 2021 08:15
-
-
Save philippkraft/f941a7fe2332158724236ecccb25edaf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A simple and ugly tool to program SDI12 sensors | |
@author: philippkraft | |
""" | |
# The serial port, for linux something like /dev/ttyUSB1, None meens use mock | |
port = None | |
# Timeout for the serial port | |
timeout = 0.3 | |
import tkinter as tk | |
import serial | |
import contextlib | |
import time | |
class MockSerial: | |
""" | |
Mocks a serial port for testing purposes | |
""" | |
port='COMock' | |
value = '' | |
def write(self, data: bytes): | |
content = data.decode() | |
self.adress= content[0] | |
self.value = content[1:] | |
print(data.decode()) | |
def read(self, n: int=0): | |
time.sleep(timeout) | |
if self.adress == '?': | |
return '0'.encode() | |
elif self.value[0] == 'I': | |
return f'{self.adress}MOCKDevice.ILR.000'.encode() | |
elif self.value[0] == 'A': | |
return self.value[1].encode() | |
elif self.value[0] == 'R': | |
return (self.adress + '+10.3-17.4').encode() | |
else: | |
return (self.adress + self.value).encode() | |
class SDI12: | |
""" | |
A simple wrapper to create SDI12 commands | |
""" | |
def __init__(self, stream): | |
self.stream = stream | |
def __call__(self, adress: str, *command: str): | |
command = ''.join(command) | |
cmd = f'{adress}{command}!'.encode('ascii') | |
self.stream.write(cmd) | |
response = self.stream.read(100) | |
return cmd.decode(), response.decode().strip() | |
def __repr__(self): | |
return 'SDI12@' + self.stream.port | |
class Application(tk.Frame): | |
""" | |
The graphic user interface | |
""" | |
def __init__(self, sdi12: SDI12): | |
self.master = tk.Tk() | |
self.master.title(sdi12) | |
super().__init__(self.master) | |
self.pack() | |
self.create_widgets() | |
self.sdi12 = sdi12 | |
def grid_layout(self, widgets): | |
""" | |
Widgets is a list of list of tk widgets | |
""" | |
for row, row_list in enumerate(widgets): | |
for col, widget in enumerate(row_list): | |
if widget: | |
widget.grid(column=col, row=row, ipadx=5, pady=5, sticky=tk.W) | |
def create_widgets(self): | |
self.adress= tk.StringVar() | |
self.new_adress= tk.StringVar() | |
self.command = tk.StringVar() | |
self.response_label = tk.Label(self) | |
self.info_label = tk.Label(self) | |
# self.quit = tk.Button(self, text="close", command=self.master.destroy) | |
self.widgets = [ | |
[ | |
tk.Button(self, text='adress:', command=self.scan_bus), | |
tk.Entry(self, width=3, textvariable=self.adress) | |
], | |
[ | |
tk.Button(self, text='info', command=self.info), | |
self.info_label | |
], | |
[ | |
tk.Button(self, text='change adress', command=self.change_adress), | |
tk.Entry(self, width=3, textvariable=self.new_adress) | |
], | |
[ | |
tk.Button(self, text='run other command', command=self.do), | |
tk.Entry(self, width=10, textvariable=self.command) | |
], | |
[ | |
tk.Button(self, text='Read buffer', command=self.read_buffer) | |
] | |
] | |
self.grid_layout(self.widgets) | |
self.response_label.grid(column=0, row=len(self.widgets), columnspan=2) | |
def log(self, *text): | |
"""Writes a message on the window""" | |
self.response_label['bg'] = 'SystemButtonFace' | |
self.response_label['fg'] = 'black' | |
self.response_label['text'] = ' '.join(text) | |
def warning(self, *text): | |
"""Writes a warning on the window""" | |
self.response_label['bg'] = 'red' | |
self.response_label['fg'] = 'white' | |
self.response_label['text'] = ' '.join(text) | |
def scan_bus(self): | |
"""Scans all adresses, use only with a single device""" | |
c, r = self.sdi12('', '?') | |
self.adress.set(r) | |
self.log(c, '->', r) | |
def info(self): | |
"""Run the SDI12 (I)nfo command and print the information""" | |
a = self.adress.get() | |
if a: | |
c, r = self.sdi12(a, 'I') | |
self.log(c, '->', r) | |
self.info_label['text'] = r | |
else: | |
self.warning('No adress!') | |
def change_adress(self): | |
"""Change the adress""" | |
a = self.adress.get() | |
if not a: | |
self.warning('No adress!') | |
return | |
n = self.new_adress.get() | |
if n: | |
c, r = self.sdi12(a, 'A', n) | |
self.log(c, '->', r) | |
self.adress.set(r) | |
self.new_adress.set('') | |
self.info_label['text'] = '' | |
else: | |
self.warning('No new adress!') | |
def do(self): | |
"""Perform any command""" | |
a = self.adress.get() | |
if not a: | |
self.warning('No adress!') | |
return | |
c = self.command.get() | |
if c: | |
c, r = self.sdi12(a, c) | |
self.log(c, ' -> ', r) | |
else: | |
self.warning('No command!') | |
def read_buffer(self): | |
"""Read and clear the buffer""" | |
r = self.sdi12.stream.read(1000).decode() | |
self.log(r) | |
@contextlib.contextmanager | |
def open_sdi12(port:str, timeout: float) -> SDI12: | |
""" | |
Opens a serial port (or MochSerial if no port is given) and returns | |
an SDI12 object | |
Parameters | |
---------- | |
port : | |
The port name None for an offline test case, on Windows something | |
like 'COM6' and on linux / MacOS eg. '/dev/ttyUSB0' | |
timeout : float | |
Time to scan for a response | |
Yields | |
------ | |
SDI12 | |
A new SDI12 object with an open serial connection | |
""" | |
if not port: | |
yield SDI12(MockSerial()) | |
else: | |
with serial.Serial(port, timeout=timeout) as S: | |
yield SDI12(S) | |
print(f'{port} closed') | |
if __name__ == '__main__': | |
with open_sdi12(port, timeout) as sdi12: | |
app = Application(sdi12) | |
app.mainloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment