Last active
February 7, 2024 01:51
-
-
Save FFY00/19d794e54a9683c801e04404a39dc9bb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import concurrent.futures | |
import contextlib | |
import datetime | |
import decimal | |
import functools | |
import time | |
import threading | |
import traceback | |
from collections.abc import Iterator | |
from typing import Callable, ClassVar | |
import influxdb_client | |
import rich.text | |
import vxi11 | |
import textual.app | |
import textual.widgets | |
# DB | |
db_client = influxdb_client.InfluxDBClient( | |
url='http://localhost:8086', | |
token='...', | |
) | |
db_write_api = db_client.write_api(write_options=influxdb_client.client.write_api.SYNCHRONOUS) | |
def db_write(point: influxdb_client.Point) -> None: | |
db_write_api.write( | |
bucket='...', | |
org='...', | |
record=point, | |
) | |
# Types | |
class InstrumentTask: | |
instrument_id: ClassVar[str] | |
def __init_subclass__(cls, instrument_id: str, **kwargs: dict) -> None: | |
super().__init_subclass__(**kwargs) | |
cls.instrument_id = instrument_id | |
def __init__( | |
self, io: vxi11.Instrument, | |
io_lock: threading.Lock | None = None, | |
) -> None: | |
self._io = io | |
self._io_lock = io_lock or threading.Lock() | |
@property | |
@contextlib.contextmanager | |
def io(self) -> Iterator[vxi11.Instrument]: | |
with self._io_lock: | |
self._io.open() | |
yield self._io | |
self._io.close() | |
def update_status(status: bool) -> None: | |
pass | |
def update_message(message: textual.widgets.CellType) -> None: | |
pass | |
def init(self) -> None: | |
... | |
def read(self) -> None: | |
... | |
def quit(self) -> None: | |
... | |
# Instruments | |
class HP3458A(InstrumentTask, instrument_id='3458A'): | |
def temp(self) -> decimal.Decimal: | |
with self.io as io: | |
raw = io.ask('TEMP?') | |
value = decimal.Decimal(raw) | |
db_write( | |
influxdb_client.Point('temperature') | |
.tag('target', self.instrument_id) | |
.time(datetime.datetime.utcnow()) | |
.field('value', value) | |
) | |
return value | |
def acal(self) -> None: | |
self.update_message(f'[yellow]running ACAL') | |
with self.io as io: | |
io.write('ACAL DCV') | |
time.sleep(2*60 + 45) # wait to finish — 2m45s (per Keysight's manual) | |
self.last_cal_time = datetime.datetime.utcnow() | |
self.last_cal_temp = self.temp() | |
with self.io as io: | |
raw = io.ask('CAL? 72') | |
cal72 = decimal.Decimal(raw) | |
db_write( | |
influxdb_client.Point('CAL72') | |
.tag('instrument', self.instrument_id) | |
.time(datetime.datetime.utcnow()) | |
.field('value', cal72) | |
.field('temperature', self.last_cal_temp) | |
) | |
# The Keysight manual says to let the meter settle for 15min after ACAL DCV | |
# Though, this seems to be a new addition, and folks on the xdevs IRC | |
# say that they haven't experienced any issues taking readings right | |
# after ACAL DCV. Perhaps this has to do with a newer relay part being | |
# used in the new Keysight branded 3458As? | |
#self.update_message(f'[yellow]settling after ACAL') | |
#time.sleep(15*60) | |
def ensure_acal(self) -> bool: | |
temp_drift = abs(self.last_cal_temp - self.temp()) | |
acal_time_delta = datetime.datetime.utcnow() - self.last_cal_time | |
# Specs are 1C drift or 24h, let's run ACAL slightly before that | |
if temp_drift >= 0.9 or acal_time_delta >= datetime.timedelta(hours=23): | |
self.acal() | |
def init(self) -> None: | |
with self.io as io: | |
io.write('END ALWAYS') | |
io.write('PRESET') | |
io.write('NPLC 100') | |
io.write('NDIG 9') | |
io.write('TARM AUTO') | |
self.acal() | |
def read(self) -> None: | |
self.ensure_acal() | |
self.update_message(f'[green]reading') | |
with self.io as io: | |
raw = io.read() | |
value = decimal.Decimal(raw) | |
db_write( | |
influxdb_client.Point('voltage') | |
.tag('instrument', self.instrument_id) | |
.tag('target', 'STD #1') # front | |
.time(datetime.datetime.utcnow()) | |
.field('value', value) | |
) | |
class S7071(InstrumentTask, instrument_id='S7071'): | |
def init(self) -> None: | |
self.update_message(f'[yellow]initializing') | |
with self.io as io: | |
io.write('INITIALISE') | |
time.sleep(5) | |
with self.io as io: | |
io.write('DRIFT=ON') | |
io.write('DELIMITER=END') | |
io.write('OUTPUT,GP-IB=ON') | |
io.write('MEASURE,CONTINUOUS') | |
io.write('MODe=VDC: RANge=10: NInes=8') | |
def read(self) -> None: | |
self.update_message(f'[green]reading') | |
while True: | |
with self.io as io: | |
if io.read_stb() == 24: | |
raw = io.read() | |
break | |
time.sleep(0.1) | |
value = decimal.Decimal(raw) | |
db_write( | |
influxdb_client.Point('voltage') | |
.tag('instrument', self.instrument_id) | |
.tag('target', 'STD #1') | |
.time(datetime.datetime.utcnow()) | |
.field('value', value) | |
) | |
class R6581T(InstrumentTask, instrument_id='R6581T'): | |
def temp(self) -> decimal.Decimal: | |
with self.io as io: | |
raw = io.ask('ITEMPERATURE?') | |
value = decimal.Decimal(raw) | |
db_write( | |
influxdb_client.Point('temperature') | |
.tag('target', self.instrument_id) | |
.time(datetime.datetime.utcnow()) | |
.field('value', value) | |
) | |
return value | |
def acal(self) -> None: | |
self.update_message(f'[yellow]running ACAL') | |
with self.io as io: | |
io.write('CAL:INT:DCV') | |
time.sleep(60 + 40) # wait to finish, tested to be 1m40s | |
self.last_cal_time = datetime.datetime.utcnow() | |
self.last_cal_temp = self.temp() | |
def ensure_acal(self) -> bool: | |
temp_drift = abs(self.last_cal_temp - self.temp()) | |
acal_time_delta = datetime.datetime.utcnow() - self.last_cal_time | |
# Specs are 1C drift or 24h, let's run ACAL slightly before that — same as 3458A | |
if temp_drift >= 0.9 or acal_time_delta >= datetime.timedelta(hours=23): | |
self.acal() | |
def init(self) -> None: | |
self.acal() | |
# TODO: Setup measurement | |
def read(self) -> None: | |
self.ensure_acal() | |
self.update_message(f'[green]reading') | |
with self.io as io: | |
raw = io.ask('READ?') | |
value = decimal.Decimal(raw) | |
db_write( | |
influxdb_client.Point('voltage') | |
.tag('instrument', self.instrument_id) | |
.tag('target', 'STD #1') # front | |
.time(datetime.datetime.utcnow()) | |
.field('value', value) | |
) | |
class DMM6500(InstrumentTask, instrument_id='DMM6500'): | |
def init(self) -> None: | |
pass # TODO: Setup meaurement | |
def read(self) -> None: | |
self.update_message(f'[green]reading') | |
with self.io as io: | |
raw = io.ask('READ?') | |
value = decimal.Decimal(raw) | |
db_write( | |
influxdb_client.Point('temperature') | |
.tag('target', 'ambient') | |
.time(datetime.datetime.utcnow()) | |
.field('value', value) | |
) | |
time.sleep(0.5) | |
def quit(self) -> None: | |
pass # TODO: Go back to continuous measurement | |
# App | |
class MonitorApp(textual.app.App): | |
BINDINGS = [ | |
('ctrl+c', 'quit', 'Quit'), | |
] | |
def __init__(self, instruments: list[InstrumentTask]) -> None: | |
super().__init__() | |
self.instruments = instruments | |
self.action_executor = concurrent.futures.ThreadPoolExecutor() | |
self.action_exit_signal = threading.Event() | |
self.exeception_file = open('error.log', 'w') | |
print(f'=== New session: {datetime.datetime.utcnow().isoformat()} ===\n\n', file=self.exeception_file, flush=True) | |
def action_exception_handler(self, instrument: InstrumentTask, target: str, exception: Exception) -> None: | |
print(f'+++ Exception in instrument: {instrument.instrument_id} +++\n', file=self.exeception_file) | |
traceback.print_exception(exception, file=self.exeception_file) | |
print(file=self.exeception_file) | |
self.exeception_file.flush() | |
instrument.update_status(False) | |
instrument.update_message(f'[red]{instrument.instrument_id} failure') | |
def action_loop(self, instrument: InstrumentTask) -> None: | |
while not self.action_exit_signal.is_set(): | |
try: | |
instrument.init() | |
break | |
except Exception as e: | |
self.action_exception_handler(instrument, 'init', e) | |
time.sleep(1) | |
while not self.action_exit_signal.is_set(): | |
try: | |
instrument.read() | |
instrument.update_status(True) | |
except Exception as e: | |
self.action_exception_handler(instrument, 'read', e) | |
time.sleep(0.1) | |
try: | |
instrument.quit() | |
instrument.update_status(None) | |
except Exception as e: | |
self.action_exception_handler(instrument, 'quit', e) | |
def update_status(self, update_cell: Callable[[textual.widgets.CellType], None], status: bool | None) -> None: | |
if status is None: | |
color = 'grey' | |
elif status: | |
color = 'green' | |
else: | |
color = 'red' | |
update_cell(rich.text.Text('⚫', style=color, justify='center')) | |
def compose(self) -> textual.app.ComposeResult: | |
yield textual.widgets.DataTable(show_cursor=False) | |
def on_mount(self) -> None: | |
table = self.query_one(textual.widgets.DataTable) | |
_, status_column_key, info_column_key = table.add_columns('instrument', 'status', 'info') | |
for i in self.instruments: | |
grey_centered_dot = rich.text.Text('⚫', style='grey', justify='center') | |
row_key = table.add_row(i.instrument_id, grey_centered_dot, '[grey]starting') | |
update_cell_status = functools.partial(table.update_cell, row_key, status_column_key) | |
i.update_status = functools.partial(self.update_status, update_cell_status) | |
i.update_message = functools.partial(table.update_cell, row_key, info_column_key) | |
self.action_executor.submit(self.action_loop, i) | |
def on_unmount(self) -> None: | |
self.action_exit_signal.set() | |
self.action_executor.shutdown() | |
def main() -> None: | |
gpib_adapter_lock = threading.Lock() | |
instruments = [ | |
HP3458A(vxi11.Instrument('10.0.0.173', name='gpib0,01'), gpib_adapter_lock), | |
S7071(vxi11.Instrument('10.0.0.173', name='gpib0,02'), gpib_adapter_lock), | |
R6581T(vxi11.Instrument('10.0.0.173', name='gpib0,03'), gpib_adapter_lock), | |
DMM6500(vxi11.Instrument('10.0.0.111')), | |
] | |
app = MonitorApp(instruments) | |
app.run() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment