Skip to content

Instantly share code, notes, and snippets.

@FFY00
Last active February 7, 2024 01:51
Show Gist options
  • Save FFY00/19d794e54a9683c801e04404a39dc9bb to your computer and use it in GitHub Desktop.
Save FFY00/19d794e54a9683c801e04404a39dc9bb to your computer and use it in GitHub Desktop.
from __future__ import annotations
import concurrent.futures
import contextlib
import datetime
import decimal
import functools
import time
import threading
import traceback
from collections.abc import Iterator
from typing import Callable, ClassVar
import influxdb_client
import rich.text
import vxi11
import textual.app
import textual.widgets
# DB
db_client = influxdb_client.InfluxDBClient(
url='http://localhost:8086',
token='...',
)
db_write_api = db_client.write_api(write_options=influxdb_client.client.write_api.SYNCHRONOUS)
def db_write(point: influxdb_client.Point) -> None:
db_write_api.write(
bucket='...',
org='...',
record=point,
)
# Types
class InstrumentTask:
instrument_id: ClassVar[str]
def __init_subclass__(cls, instrument_id: str, **kwargs: dict) -> None:
super().__init_subclass__(**kwargs)
cls.instrument_id = instrument_id
def __init__(
self, io: vxi11.Instrument,
io_lock: threading.Lock | None = None,
) -> None:
self._io = io
self._io_lock = io_lock or threading.Lock()
@property
@contextlib.contextmanager
def io(self) -> Iterator[vxi11.Instrument]:
with self._io_lock:
self._io.open()
yield self._io
self._io.close()
def update_status(status: bool) -> None:
pass
def update_message(message: textual.widgets.CellType) -> None:
pass
def init(self) -> None:
...
def read(self) -> None:
...
def quit(self) -> None:
...
# Instruments
class HP3458A(InstrumentTask, instrument_id='3458A'):
def temp(self) -> decimal.Decimal:
with self.io as io:
raw = io.ask('TEMP?')
value = decimal.Decimal(raw)
db_write(
influxdb_client.Point('temperature')
.tag('target', self.instrument_id)
.time(datetime.datetime.utcnow())
.field('value', value)
)
return value
def acal(self) -> None:
self.update_message(f'[yellow]running ACAL')
with self.io as io:
io.write('ACAL DCV')
time.sleep(2*60 + 45) # wait to finish — 2m45s (per Keysight's manual)
self.last_cal_time = datetime.datetime.utcnow()
self.last_cal_temp = self.temp()
with self.io as io:
raw = io.ask('CAL? 72')
cal72 = decimal.Decimal(raw)
db_write(
influxdb_client.Point('CAL72')
.tag('instrument', self.instrument_id)
.time(datetime.datetime.utcnow())
.field('value', cal72)
.field('temperature', self.last_cal_temp)
)
# The Keysight manual says to let the meter settle for 15min after ACAL DCV
# Though, this seems to be a new addition, and folks on the xdevs IRC
# say that they haven't experienced any issues taking readings right
# after ACAL DCV. Perhaps this has to do with a newer relay part being
# used in the new Keysight branded 3458As?
#self.update_message(f'[yellow]settling after ACAL')
#time.sleep(15*60)
def ensure_acal(self) -> bool:
temp_drift = abs(self.last_cal_temp - self.temp())
acal_time_delta = datetime.datetime.utcnow() - self.last_cal_time
# Specs are 1C drift or 24h, let's run ACAL slightly before that
if temp_drift >= 0.9 or acal_time_delta >= datetime.timedelta(hours=23):
self.acal()
def init(self) -> None:
with self.io as io:
io.write('END ALWAYS')
io.write('PRESET')
io.write('NPLC 100')
io.write('NDIG 9')
io.write('TARM AUTO')
self.acal()
def read(self) -> None:
self.ensure_acal()
self.update_message(f'[green]reading')
with self.io as io:
raw = io.read()
value = decimal.Decimal(raw)
db_write(
influxdb_client.Point('voltage')
.tag('instrument', self.instrument_id)
.tag('target', 'STD #1') # front
.time(datetime.datetime.utcnow())
.field('value', value)
)
class S7071(InstrumentTask, instrument_id='S7071'):
def init(self) -> None:
self.update_message(f'[yellow]initializing')
with self.io as io:
io.write('INITIALISE')
time.sleep(5)
with self.io as io:
io.write('DRIFT=ON')
io.write('DELIMITER=END')
io.write('OUTPUT,GP-IB=ON')
io.write('MEASURE,CONTINUOUS')
io.write('MODe=VDC: RANge=10: NInes=8')
def read(self) -> None:
self.update_message(f'[green]reading')
while True:
with self.io as io:
if io.read_stb() == 24:
raw = io.read()
break
time.sleep(0.1)
value = decimal.Decimal(raw)
db_write(
influxdb_client.Point('voltage')
.tag('instrument', self.instrument_id)
.tag('target', 'STD #1')
.time(datetime.datetime.utcnow())
.field('value', value)
)
class R6581T(InstrumentTask, instrument_id='R6581T'):
def temp(self) -> decimal.Decimal:
with self.io as io:
raw = io.ask('ITEMPERATURE?')
value = decimal.Decimal(raw)
db_write(
influxdb_client.Point('temperature')
.tag('target', self.instrument_id)
.time(datetime.datetime.utcnow())
.field('value', value)
)
return value
def acal(self) -> None:
self.update_message(f'[yellow]running ACAL')
with self.io as io:
io.write('CAL:INT:DCV')
time.sleep(60 + 40) # wait to finish, tested to be 1m40s
self.last_cal_time = datetime.datetime.utcnow()
self.last_cal_temp = self.temp()
def ensure_acal(self) -> bool:
temp_drift = abs(self.last_cal_temp - self.temp())
acal_time_delta = datetime.datetime.utcnow() - self.last_cal_time
# Specs are 1C drift or 24h, let's run ACAL slightly before that — same as 3458A
if temp_drift >= 0.9 or acal_time_delta >= datetime.timedelta(hours=23):
self.acal()
def init(self) -> None:
self.acal()
# TODO: Setup measurement
def read(self) -> None:
self.ensure_acal()
self.update_message(f'[green]reading')
with self.io as io:
raw = io.ask('READ?')
value = decimal.Decimal(raw)
db_write(
influxdb_client.Point('voltage')
.tag('instrument', self.instrument_id)
.tag('target', 'STD #1') # front
.time(datetime.datetime.utcnow())
.field('value', value)
)
class DMM6500(InstrumentTask, instrument_id='DMM6500'):
def init(self) -> None:
pass # TODO: Setup meaurement
def read(self) -> None:
self.update_message(f'[green]reading')
with self.io as io:
raw = io.ask('READ?')
value = decimal.Decimal(raw)
db_write(
influxdb_client.Point('temperature')
.tag('target', 'ambient')
.time(datetime.datetime.utcnow())
.field('value', value)
)
time.sleep(0.5)
def quit(self) -> None:
pass # TODO: Go back to continuous measurement
# App
class MonitorApp(textual.app.App):
BINDINGS = [
('ctrl+c', 'quit', 'Quit'),
]
def __init__(self, instruments: list[InstrumentTask]) -> None:
super().__init__()
self.instruments = instruments
self.action_executor = concurrent.futures.ThreadPoolExecutor()
self.action_exit_signal = threading.Event()
self.exeception_file = open('error.log', 'w')
print(f'=== New session: {datetime.datetime.utcnow().isoformat()} ===\n\n', file=self.exeception_file, flush=True)
def action_exception_handler(self, instrument: InstrumentTask, target: str, exception: Exception) -> None:
print(f'+++ Exception in instrument: {instrument.instrument_id} +++\n', file=self.exeception_file)
traceback.print_exception(exception, file=self.exeception_file)
print(file=self.exeception_file)
self.exeception_file.flush()
instrument.update_status(False)
instrument.update_message(f'[red]{instrument.instrument_id} failure')
def action_loop(self, instrument: InstrumentTask) -> None:
while not self.action_exit_signal.is_set():
try:
instrument.init()
break
except Exception as e:
self.action_exception_handler(instrument, 'init', e)
time.sleep(1)
while not self.action_exit_signal.is_set():
try:
instrument.read()
instrument.update_status(True)
except Exception as e:
self.action_exception_handler(instrument, 'read', e)
time.sleep(0.1)
try:
instrument.quit()
instrument.update_status(None)
except Exception as e:
self.action_exception_handler(instrument, 'quit', e)
def update_status(self, update_cell: Callable[[textual.widgets.CellType], None], status: bool | None) -> None:
if status is None:
color = 'grey'
elif status:
color = 'green'
else:
color = 'red'
update_cell(rich.text.Text('⚫', style=color, justify='center'))
def compose(self) -> textual.app.ComposeResult:
yield textual.widgets.DataTable(show_cursor=False)
def on_mount(self) -> None:
table = self.query_one(textual.widgets.DataTable)
_, status_column_key, info_column_key = table.add_columns('instrument', 'status', 'info')
for i in self.instruments:
grey_centered_dot = rich.text.Text('⚫', style='grey', justify='center')
row_key = table.add_row(i.instrument_id, grey_centered_dot, '[grey]starting')
update_cell_status = functools.partial(table.update_cell, row_key, status_column_key)
i.update_status = functools.partial(self.update_status, update_cell_status)
i.update_message = functools.partial(table.update_cell, row_key, info_column_key)
self.action_executor.submit(self.action_loop, i)
def on_unmount(self) -> None:
self.action_exit_signal.set()
self.action_executor.shutdown()
def main() -> None:
gpib_adapter_lock = threading.Lock()
instruments = [
HP3458A(vxi11.Instrument('10.0.0.173', name='gpib0,01'), gpib_adapter_lock),
S7071(vxi11.Instrument('10.0.0.173', name='gpib0,02'), gpib_adapter_lock),
R6581T(vxi11.Instrument('10.0.0.173', name='gpib0,03'), gpib_adapter_lock),
DMM6500(vxi11.Instrument('10.0.0.111')),
]
app = MonitorApp(instruments)
app.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment