Skip to content

Instantly share code, notes, and snippets.

@kageurufu
Created September 13, 2024 17:13
Show Gist options
  • Save kageurufu/a9d56e8a571c235762963aad88303ee3 to your computer and use it in GitHub Desktop.
Save kageurufu/a9d56e8a571c235762963aad88303ee3 to your computer and use it in GitHub Desktop.
Updated E3D Revo PZ Probe calibration script
#!/usr/bin/env python
#
# Original piezo_config.py by E3D-Online
# piezo_cli.py by Franklyn Tackitt <[email protected]>
#
# Requires pyserial
# Optionally requires:
# numpy and scipy for analysis
# plotext for text-mode plotting
"""
Command line utility for the configuring the Revo PZ Probe
"""
import argparse
import io
import re
import sys
import textwrap
import time
import importlib.util
import serial
have_numpy = importlib.util.find_spec('numpy') and importlib.util.find_spec('scipy')
have_plotext = importlib.util.find_spec('plotext')
if not have_numpy:
print("! scipy is not installed, analysis is not available", file=sys.stderr)
if not have_plotext:
print("! plotext is not installed, graphs will not be shown", file=sys.stderr)
def __doc(func):
return textwrap.dedent(func.__doc__).strip()
SAMPLE_FREQUENCY = 7788 # Hz
def _read_lines_from(ser, size=1024):
# type: (serial.Serial) -> typing.Generator[bytes, None, None]
"""
Read from a serial port until the read buffer is empty and no more data is returned
"""
buffer = b""
while True:
data = ser.read(size)
if not data:
if buffer:
yield buffer
return
buffer += data
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", maxsplit=1)
yield line
def _read_to_end(ser, stream=sys.stdout):
# type: (serial.Serial, typing.IO) -> None
for line in _read_lines_from(ser):
stream.write(line.decode("utf-8") + "\n")
def _status(ser, args):
# type: (serial.Serial, argparse.Namespace) -> None
"""
Report the current PZ Probe Status
Example:
Current Button State is 3, with settings:
THRESHOLD: 1, TRIGGER DIRECTION: 1
FILTER ACTIVE: 0, F-Fc: 260, F-BW: 50
SAFETY TRIGGER: 1
"""
ser.write("STATUS\n".encode())
_read_to_end(ser)
def _update_preset(ser, args):
# type: (serial.Serial, argparse.Namespace) -> None
"""
Reconfigure a preset, updating the configuration values
"""
command = "SET {:01d},{:03d},{:01d},{:01d},{:03d},{:03d},{:01d}\n".format(
args.preset,
args.trigger_threshold,
args.trigger_direction,
args.filter_active,
args.filter_centre_frequency,
args.filter_bandwidth,
args.safety_trigger_active,
)
ser.write((command).encode())
_read_to_end(ser)
def _select_preset(ser, args):
# type: (serial.Serial, argparse.Namespace) -> None
"""
Select a preset (from 0-7)
"""
ser.write(("SET {:d}\n".format(args.preset)).encode())
_read_to_end(ser)
def _analyze(_ser, args):
"""
Analyze a series of values recorded from rec
"""
import numpy as np
values = np.loadtxt(args.file)
_analyze_values(values)
def _analyze_values(values, marks=[]):
import numpy as np
import scipy as sp
if have_plotext:
import plotext
print("Analyzing recording, {} values".format(len(values)))
data = np.array(values)
all_peaks, _ = sp.signal.find_peaks(data)
peak_index = np.argmax(np.abs(data))
peak = data[peak_index]
waves = np.fft.fft(data)
frequencies = np.fft.fftfreq(len(waves))
frequency_index = np.argmax(np.abs(waves))
frequency = frequencies[frequency_index]
frequency_in_hz = abs(frequency * SAMPLE_FREQUENCY)
print("ADC readings peaked at {:0.1f}".format(peak))
if frequency_in_hz:
print("Detected oscillation at {:0.1f}Hz".format(frequency_in_hz))
f_w, P_w = sp.signal.welch(data, SAMPLE_FREQUENCY, scaling="density")
welch_peak_index = np.argmax(P_w)
welch_frequency = f_w[welch_peak_index]
print("Spectral freqencies peak at {:0.1f}Hz".format(welch_frequency))
adc_fscale = 1000 / SAMPLE_FREQUENCY
if have_plotext:
# Plot ADC values
plotext.plot(
np.arange(1000) * adc_fscale,
data,
xside="upper",
yside="left",
label="Raw Data",
marker="braille",
)
plotext.xlabel("t(ms)", xside="upper")
plotext.ylabel("ADC value", yside="left")
if len(all_peaks) < 30:
for peak_idx in all_peaks:
plotext.vline(peak_idx * adc_fscale, color='yellow', xside='upper')
else:
plotext.vline(peak_index * adc_fscale, color='', xside='upper')
plotext.plot(f_w, P_w, label="Spectral Frequency", yside="right", xside="lower")
plotext.vline(welch_frequency, color="white", xside="lower")
plotext.text("{:0.1f}Hz".format(welch_frequency), x=welch_frequency, y=np.min(P_w), xside="lower", yside="right", color="magenta")
plotext.xlabel("Hz", xside="lower")
plotext.yscale("log", yside="right")
plotext.ylabel("Power density", yside="right")
for (x, y, label) in marks:
plotext.vline(x * adc_fscale, xside='upper')
plotext.text(label, x * adc_fscale, y, xside='upper', yside='left')
plotext.theme("clear")
plotext.plot_size(plotext.terminal_width(), 30)
plotext.show()
def _report_adc(ser, args):
# type: (serial.Serial, argparse.Namespace) -> None
"""
Record 1000 readings
Troubleshooting command used for isolating oscillating noise frequencies
Readings are taken at approximately 7788Hz
"""
stream = None
if args.out:
stream = open(args.out, "w")
elif not args.analyze:
stream = sys.stdout
ser.write(("REPORT ADC\n").encode())
time.sleep(0.1)
ser.timeout = 1
ser.inter_byte_timeout = 0.5
count = 0
values = []
for line in _read_lines_from(ser, 2):
if b"reported ADC values:" in line:
print(line.decode("utf-8"))
continue
if b"Report end." in line:
print(line.decode("utf-8"))
break
count += 1
if stream is not sys.stdout:
print(count, end="\r")
value = line.decode("utf-8")
values.append(value)
if stream:
print(value, file=stream)
if args.analyze:
_analyze_values([float(v) for v in values])
def _record_trigger(ser, args):
# type: (serial.Serial, argparse.Namespace) -> None
"""
Report 1000 readings, 100 before and 900 after triggering
Readings are taken at approximately 7788Hz
"""
stream = None
if args.out:
stream = open(args.out, "w")
elif not args.analyze:
stream = sys.stdout
ser.write(("REC TRIG\n").encode())
time.sleep(0.1)
ser.timeout = 2
ser.inter_byte_timeout = 0.5
lines_iter = _read_lines_from(ser, 1)
while True:
line = next(lines_iter, None)
if line:
print(line.decode("utf-8"))
if b"Readings collected were:" in line:
break
count = 0
values = []
for line in lines_iter:
if b"Report end." in line:
print(line.decode("utf-8"))
break
count += 1
if stream is not sys.stdout:
print(count, end="\r")
value = line.decode("utf-8")
values.append(value)
if stream:
print(value, file=stream)
print("Received {} values".format(count))
if args.analyze:
_analyze_values([float(v) for v in values], marks=[(100, -10, "Trigger")])
def _dump_presets(ser, args):
# type: (serial.Serial, argparse.Namespace) -> None
"""
List all configured presets
"""
ser.write("STATUS\n".encode())
current_status = io.StringIO()
_read_to_end(ser, current_status)
current_preset = re.match(
r"Current Button State is (\d)",
current_status.getvalue(),
).group(1)
for i in range(0, 8):
ser.write(("SET {:d}\n".format(i)).encode())
preset = io.StringIO()
_read_to_end(ser, stream=preset)
print("Preset {}:".format(i))
print(
textwrap.indent(preset.getvalue().split("\n", maxsplit=1)[1], prefix="\t")
)
ser.write(("SET {}\n".format(current_preset)).encode())
_read_to_end(ser, stream=current_status)
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--device", "-d", default="/dev/serial0", help="Serial device name")
parser.add_argument("--timeout", default=1.0, type=float, help="Serial device name")
subparsers = parser.add_subparsers(required=True, help="Commands")
parser_dump = subparsers.add_parser("list_presets", help=__doc(_dump_presets))
parser_dump.set_defaults(action=_dump_presets)
parser_status = subparsers.add_parser("status", help=__doc(_status))
parser_status.set_defaults(action=_status)
parser_set = subparsers.add_parser("preset", help=__doc(_select_preset))
parser_set.set_defaults(action=_select_preset)
parser_set.add_argument("preset", type=int, help="Preset from 0-7")
parser_update = subparsers.add_parser("set", help=__doc(_update_preset))
parser_update.set_defaults(action=_update_preset)
parser_update.add_argument("preset", type=int, help="Preset from 0-7")
parser_update.add_argument(
"trigger_threshold", type=int, help="Trigger threshold (-400 to 400)"
)
parser_update.add_argument(
"filter_centre_frequency",
type=int,
help="Filter centre frequency in Hz (0 to 500)",
)
parser_update.add_argument(
"filter_bandwidth", type=int, help="Filter bandwidth in Hz (0 to 500)"
)
parser_update.add_argument(
"--no-safety-trigger",
dest="safety_trigger_active",
action="store_const",
default=1,
const=0,
help="Disable the safety trigger",
)
parser_update.add_argument(
"--filter",
dest="filter_active",
action="store_const",
default=0,
const=1,
help="Enable the filter",
)
parser_update.add_argument(
"--trigger-falling-edge",
dest="trigger_direction",
action="store_const",
default=1,
const=0,
help="Trigger on the falling edge, default is rising edge",
)
parser_report_adc = subparsers.add_parser("report_adc", help=__doc(_report_adc))
parser_report_adc.set_defaults(action=_report_adc)
parser_report_adc.add_argument("--out", "-o", help="Save report to a csv file")
parser_rec_trig = subparsers.add_parser("record_trigger", help=__doc(_record_trigger))
parser_rec_trig.set_defaults(action=_record_trigger)
parser_rec_trig.add_argument("--out", "-o", help="Save report to a csv file")
if have_numpy:
parser_report_adc.add_argument(
"--analyze",
action="store_true",
help="Analyze recorded values for oscillating frequencies",
)
parser_rec_trig.add_argument(
"--analyze",
action="store_true",
help="Analyze recorded values for oscillating frequencies",
)
parser_analyze = subparsers.add_parser("analyze", help=__doc(_analyze))
parser_analyze.set_defaults(action=_analyze)
parser_analyze.add_argument("file", help="Recording output to analyze")
if __name__ == "__main__":
args = parser.parse_args()
ser = serial.Serial(args.device, 9600, timeout=0.1, inter_byte_timeout=0.1)
args.action(ser, args)
sys.exit(0)
@Killajoedotcom
Copy link

Frank, How do I use this?

@kageurufu
Copy link
Author

Easiest, create a new virtualenv.

python3 -m venv ~/piezo-env
~/piezo-env/bin/pip install pyserial numpy scipy plotext

~/piezo-env/bin/python piezo_cli.py status

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment