Created
September 13, 2024 17:13
-
-
Save kageurufu/a9d56e8a571c235762963aad88303ee3 to your computer and use it in GitHub Desktop.
Updated E3D Revo PZ Probe calibration script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Original piezo_config.py by E3D-Online | |
# piezo_cli.py by Franklyn Tackitt <[email protected]> | |
# | |
# Requires pyserial | |
# Optionally requires: | |
# numpy and scipy for analysis | |
# plotext for text-mode plotting | |
""" | |
Command line utility for the configuring the Revo PZ Probe | |
""" | |
import argparse | |
import io | |
import re | |
import sys | |
import textwrap | |
import time | |
import importlib.util | |
import serial | |
have_numpy = importlib.util.find_spec('numpy') and importlib.util.find_spec('scipy') | |
have_plotext = importlib.util.find_spec('plotext') | |
if not have_numpy: | |
print("! scipy is not installed, analysis is not available", file=sys.stderr) | |
if not have_plotext: | |
print("! plotext is not installed, graphs will not be shown", file=sys.stderr) | |
def __doc(func): | |
return textwrap.dedent(func.__doc__).strip() | |
SAMPLE_FREQUENCY = 7788 # Hz | |
def _read_lines_from(ser, size=1024): | |
# type: (serial.Serial) -> typing.Generator[bytes, None, None] | |
""" | |
Read from a serial port until the read buffer is empty and no more data is returned | |
""" | |
buffer = b"" | |
while True: | |
data = ser.read(size) | |
if not data: | |
if buffer: | |
yield buffer | |
return | |
buffer += data | |
while b"\n" in buffer: | |
line, buffer = buffer.split(b"\n", maxsplit=1) | |
yield line | |
def _read_to_end(ser, stream=sys.stdout): | |
# type: (serial.Serial, typing.IO) -> None | |
for line in _read_lines_from(ser): | |
stream.write(line.decode("utf-8") + "\n") | |
def _status(ser, args): | |
# type: (serial.Serial, argparse.Namespace) -> None | |
""" | |
Report the current PZ Probe Status | |
Example: | |
Current Button State is 3, with settings: | |
THRESHOLD: 1, TRIGGER DIRECTION: 1 | |
FILTER ACTIVE: 0, F-Fc: 260, F-BW: 50 | |
SAFETY TRIGGER: 1 | |
""" | |
ser.write("STATUS\n".encode()) | |
_read_to_end(ser) | |
def _update_preset(ser, args): | |
# type: (serial.Serial, argparse.Namespace) -> None | |
""" | |
Reconfigure a preset, updating the configuration values | |
""" | |
command = "SET {:01d},{:03d},{:01d},{:01d},{:03d},{:03d},{:01d}\n".format( | |
args.preset, | |
args.trigger_threshold, | |
args.trigger_direction, | |
args.filter_active, | |
args.filter_centre_frequency, | |
args.filter_bandwidth, | |
args.safety_trigger_active, | |
) | |
ser.write((command).encode()) | |
_read_to_end(ser) | |
def _select_preset(ser, args): | |
# type: (serial.Serial, argparse.Namespace) -> None | |
""" | |
Select a preset (from 0-7) | |
""" | |
ser.write(("SET {:d}\n".format(args.preset)).encode()) | |
_read_to_end(ser) | |
def _analyze(_ser, args): | |
""" | |
Analyze a series of values recorded from rec | |
""" | |
import numpy as np | |
values = np.loadtxt(args.file) | |
_analyze_values(values) | |
def _analyze_values(values, marks=[]): | |
import numpy as np | |
import scipy as sp | |
if have_plotext: | |
import plotext | |
print("Analyzing recording, {} values".format(len(values))) | |
data = np.array(values) | |
all_peaks, _ = sp.signal.find_peaks(data) | |
peak_index = np.argmax(np.abs(data)) | |
peak = data[peak_index] | |
waves = np.fft.fft(data) | |
frequencies = np.fft.fftfreq(len(waves)) | |
frequency_index = np.argmax(np.abs(waves)) | |
frequency = frequencies[frequency_index] | |
frequency_in_hz = abs(frequency * SAMPLE_FREQUENCY) | |
print("ADC readings peaked at {:0.1f}".format(peak)) | |
if frequency_in_hz: | |
print("Detected oscillation at {:0.1f}Hz".format(frequency_in_hz)) | |
f_w, P_w = sp.signal.welch(data, SAMPLE_FREQUENCY, scaling="density") | |
welch_peak_index = np.argmax(P_w) | |
welch_frequency = f_w[welch_peak_index] | |
print("Spectral freqencies peak at {:0.1f}Hz".format(welch_frequency)) | |
adc_fscale = 1000 / SAMPLE_FREQUENCY | |
if have_plotext: | |
# Plot ADC values | |
plotext.plot( | |
np.arange(1000) * adc_fscale, | |
data, | |
xside="upper", | |
yside="left", | |
label="Raw Data", | |
marker="braille", | |
) | |
plotext.xlabel("t(ms)", xside="upper") | |
plotext.ylabel("ADC value", yside="left") | |
if len(all_peaks) < 30: | |
for peak_idx in all_peaks: | |
plotext.vline(peak_idx * adc_fscale, color='yellow', xside='upper') | |
else: | |
plotext.vline(peak_index * adc_fscale, color='', xside='upper') | |
plotext.plot(f_w, P_w, label="Spectral Frequency", yside="right", xside="lower") | |
plotext.vline(welch_frequency, color="white", xside="lower") | |
plotext.text("{:0.1f}Hz".format(welch_frequency), x=welch_frequency, y=np.min(P_w), xside="lower", yside="right", color="magenta") | |
plotext.xlabel("Hz", xside="lower") | |
plotext.yscale("log", yside="right") | |
plotext.ylabel("Power density", yside="right") | |
for (x, y, label) in marks: | |
plotext.vline(x * adc_fscale, xside='upper') | |
plotext.text(label, x * adc_fscale, y, xside='upper', yside='left') | |
plotext.theme("clear") | |
plotext.plot_size(plotext.terminal_width(), 30) | |
plotext.show() | |
def _report_adc(ser, args): | |
# type: (serial.Serial, argparse.Namespace) -> None | |
""" | |
Record 1000 readings | |
Troubleshooting command used for isolating oscillating noise frequencies | |
Readings are taken at approximately 7788Hz | |
""" | |
stream = None | |
if args.out: | |
stream = open(args.out, "w") | |
elif not args.analyze: | |
stream = sys.stdout | |
ser.write(("REPORT ADC\n").encode()) | |
time.sleep(0.1) | |
ser.timeout = 1 | |
ser.inter_byte_timeout = 0.5 | |
count = 0 | |
values = [] | |
for line in _read_lines_from(ser, 2): | |
if b"reported ADC values:" in line: | |
print(line.decode("utf-8")) | |
continue | |
if b"Report end." in line: | |
print(line.decode("utf-8")) | |
break | |
count += 1 | |
if stream is not sys.stdout: | |
print(count, end="\r") | |
value = line.decode("utf-8") | |
values.append(value) | |
if stream: | |
print(value, file=stream) | |
if args.analyze: | |
_analyze_values([float(v) for v in values]) | |
def _record_trigger(ser, args): | |
# type: (serial.Serial, argparse.Namespace) -> None | |
""" | |
Report 1000 readings, 100 before and 900 after triggering | |
Readings are taken at approximately 7788Hz | |
""" | |
stream = None | |
if args.out: | |
stream = open(args.out, "w") | |
elif not args.analyze: | |
stream = sys.stdout | |
ser.write(("REC TRIG\n").encode()) | |
time.sleep(0.1) | |
ser.timeout = 2 | |
ser.inter_byte_timeout = 0.5 | |
lines_iter = _read_lines_from(ser, 1) | |
while True: | |
line = next(lines_iter, None) | |
if line: | |
print(line.decode("utf-8")) | |
if b"Readings collected were:" in line: | |
break | |
count = 0 | |
values = [] | |
for line in lines_iter: | |
if b"Report end." in line: | |
print(line.decode("utf-8")) | |
break | |
count += 1 | |
if stream is not sys.stdout: | |
print(count, end="\r") | |
value = line.decode("utf-8") | |
values.append(value) | |
if stream: | |
print(value, file=stream) | |
print("Received {} values".format(count)) | |
if args.analyze: | |
_analyze_values([float(v) for v in values], marks=[(100, -10, "Trigger")]) | |
def _dump_presets(ser, args): | |
# type: (serial.Serial, argparse.Namespace) -> None | |
""" | |
List all configured presets | |
""" | |
ser.write("STATUS\n".encode()) | |
current_status = io.StringIO() | |
_read_to_end(ser, current_status) | |
current_preset = re.match( | |
r"Current Button State is (\d)", | |
current_status.getvalue(), | |
).group(1) | |
for i in range(0, 8): | |
ser.write(("SET {:d}\n".format(i)).encode()) | |
preset = io.StringIO() | |
_read_to_end(ser, stream=preset) | |
print("Preset {}:".format(i)) | |
print( | |
textwrap.indent(preset.getvalue().split("\n", maxsplit=1)[1], prefix="\t") | |
) | |
ser.write(("SET {}\n".format(current_preset)).encode()) | |
_read_to_end(ser, stream=current_status) | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument("--device", "-d", default="/dev/serial0", help="Serial device name") | |
parser.add_argument("--timeout", default=1.0, type=float, help="Serial device name") | |
subparsers = parser.add_subparsers(required=True, help="Commands") | |
parser_dump = subparsers.add_parser("list_presets", help=__doc(_dump_presets)) | |
parser_dump.set_defaults(action=_dump_presets) | |
parser_status = subparsers.add_parser("status", help=__doc(_status)) | |
parser_status.set_defaults(action=_status) | |
parser_set = subparsers.add_parser("preset", help=__doc(_select_preset)) | |
parser_set.set_defaults(action=_select_preset) | |
parser_set.add_argument("preset", type=int, help="Preset from 0-7") | |
parser_update = subparsers.add_parser("set", help=__doc(_update_preset)) | |
parser_update.set_defaults(action=_update_preset) | |
parser_update.add_argument("preset", type=int, help="Preset from 0-7") | |
parser_update.add_argument( | |
"trigger_threshold", type=int, help="Trigger threshold (-400 to 400)" | |
) | |
parser_update.add_argument( | |
"filter_centre_frequency", | |
type=int, | |
help="Filter centre frequency in Hz (0 to 500)", | |
) | |
parser_update.add_argument( | |
"filter_bandwidth", type=int, help="Filter bandwidth in Hz (0 to 500)" | |
) | |
parser_update.add_argument( | |
"--no-safety-trigger", | |
dest="safety_trigger_active", | |
action="store_const", | |
default=1, | |
const=0, | |
help="Disable the safety trigger", | |
) | |
parser_update.add_argument( | |
"--filter", | |
dest="filter_active", | |
action="store_const", | |
default=0, | |
const=1, | |
help="Enable the filter", | |
) | |
parser_update.add_argument( | |
"--trigger-falling-edge", | |
dest="trigger_direction", | |
action="store_const", | |
default=1, | |
const=0, | |
help="Trigger on the falling edge, default is rising edge", | |
) | |
parser_report_adc = subparsers.add_parser("report_adc", help=__doc(_report_adc)) | |
parser_report_adc.set_defaults(action=_report_adc) | |
parser_report_adc.add_argument("--out", "-o", help="Save report to a csv file") | |
parser_rec_trig = subparsers.add_parser("record_trigger", help=__doc(_record_trigger)) | |
parser_rec_trig.set_defaults(action=_record_trigger) | |
parser_rec_trig.add_argument("--out", "-o", help="Save report to a csv file") | |
if have_numpy: | |
parser_report_adc.add_argument( | |
"--analyze", | |
action="store_true", | |
help="Analyze recorded values for oscillating frequencies", | |
) | |
parser_rec_trig.add_argument( | |
"--analyze", | |
action="store_true", | |
help="Analyze recorded values for oscillating frequencies", | |
) | |
parser_analyze = subparsers.add_parser("analyze", help=__doc(_analyze)) | |
parser_analyze.set_defaults(action=_analyze) | |
parser_analyze.add_argument("file", help="Recording output to analyze") | |
if __name__ == "__main__": | |
args = parser.parse_args() | |
ser = serial.Serial(args.device, 9600, timeout=0.1, inter_byte_timeout=0.1) | |
args.action(ser, args) | |
sys.exit(0) |
Easiest, create a new virtualenv.
python3 -m venv ~/piezo-env
~/piezo-env/bin/pip install pyserial numpy scipy plotext
~/piezo-env/bin/python piezo_cli.py status
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Frank, How do I use this?