-
-
Save pklaus/0a799921217bc9a7d86f to your computer and use it in GitHub Desktop.
Python script to send commands to a Rigol scope (or any LXI/SCPI instrument) from first principles.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Zeroconf Discovery for Rigol DS1000Z-series scopes | |
-------------------------------------------------- | |
Documentation worth looking at: | |
* http://lxistandard.org/Documents/Specifications/LXI%20Device%20Specification%202011%20rev%201.4.pdf | |
* http://lxistandard.org/GuidesForUsingLXI/Introducing%20LXI%20To%20Your%20Network%20Administrator%20May%2024_2013.pdf | |
* http://lxistandard.org/GuidesForUsingLXI/LXI_Getting_Started_Guide_May_1_2013.pdf | |
* http://beyondmeasure.rigoltech.com/acton/attachment/1579/f-0386/1/-/-/-/-/DS1000Z_Programming%20Guide_EN.pdf | |
""" | |
from zeroconf import * | |
import socket | |
import time | |
import requests | |
from lxml import etree | |
import re | |
try: | |
clock = time.perf_counter | |
except AttributeError: | |
clock = time.time | |
class Listener(object): | |
def __init__(self, filter_func=None): | |
self.results = [] | |
self.filter_func = filter_func | |
def remove_service(self, zeroconf, zc_type, zc_name): | |
#print('Service "{0}" removed'.format(zc_name)) | |
pass | |
def add_service(self, zeroconf, zc_type, zc_name): | |
zc_info = zeroconf.get_service_info(zc_type, zc_name) | |
zc_info._properties = {k: v for k, v in zc_info.properties.items() if v is not None} | |
result = { | |
'zc_name' : zc_name, | |
'zc_type' : zc_type, | |
'zc_info' : zc_info, | |
} | |
if self.filter_func: | |
if self.filter_func(result): | |
self.results.append(result) | |
else: | |
self.results.append(result) | |
@staticmethod | |
def pprint(zc_name, zc_type, zc_info): | |
print('\nService "{0}" found'.format(zc_name)) | |
print('\tType: {0}'.format(zc_type)) | |
if zc_info: | |
print('\tAddress: {0}:{1}'.format(socket.inet_ntoa(zc_info.address), zc_info.port)) | |
print('\tServer name: {0}'.format(zc_info.server)) | |
prop = zc_info.properties | |
if prop: | |
print('\tProperties:') | |
for key, value in prop.items(): | |
if not value: continue | |
(key, value) = (key.decode('ascii'), value.decode('ascii')) | |
print('\t\t{0}: {1}'.format(key, value)) | |
def get_ds1000z_results(if_any_return_after=0.8, timeout=2.5): | |
""" | |
Zeroconf service discovery of "_scpi-raw._tcp.local." | |
The results are filtered for entries matching the Rigol DS1000Z scope series. | |
""" | |
zc = Zeroconf() | |
def ds1000z_filter(result): | |
check_results = [ | |
re.match(b'DS1\d\d\dZ', result['zc_info'].properties[b'Model']), | |
re.match(b'RIGOL TECHNOLOGIES', result['zc_info'].properties[b'Manufacturer']), | |
] | |
if not all(check_results): | |
return False | |
return True | |
listener = Listener(filter_func=ds1000z_filter) | |
browser = ServiceBrowser(zc, '_scpi-raw._tcp.local.', listener=listener) | |
start = clock() | |
while True: | |
# Because multithreading sucks. | |
et = clock() - start # elapsed time | |
if len(listener.results) and et >= if_any_return_after: | |
break | |
if et >= timeout: | |
break | |
time.sleep(0.005) | |
zc.close() | |
return listener.results | |
def get_scpi_connection_tuple(http_connection_tuple): | |
""" | |
* Get XML config from http://<address>:<port>/lxi/identification | |
* My scope has malformed XML in the namespace attributes, where there is a newline before the closing quote, causing | |
the parser to bork. We should just be able to concat the whole string together by removing newlines. | |
* Use XPath selector: "ns:Interface[@InterfaceType = 'LXI']/ns:InstrumentAddressString" with the | |
"http://www.lxistandard.org/InstrumentIdentification/1.0" namespace. | |
* For each InstrumentAddressString, split on "::" and look for an IP address followed by a port | |
* My scope yields a VISA type of "INSTR" for both TCPIP interfaces, when technically it should be "SOCKET" I think | |
(see: http://zone.ni.com/reference/en-XX/help/371361J-01/lvinstio/visa_resource_name_generic/ and | |
http://digital.ni.com/public.nsf/allkb/6A9285AC83C646BA86256BDC004FD4D4) | |
* Guessing that an address with no port, or port 80, is the web interface, assume the first one we come across with a | |
high-range port is our SCPI interface. | |
* By convention this is port 5025, but Rigol has chosen 5555. | |
""" | |
lxi_ident_url = 'http://{0}:{1}/lxi/identification'.format(*http_connection_tuple) | |
r = requests.get(lxi_ident_url) | |
doc = etree.fromstring(r.content.replace(b'\n', b'')) | |
scpi_address = None | |
scpi_port = None | |
for e in doc.xpath("ns:Interface[@InterfaceType = 'LXI']/ns:InstrumentAddressString", namespaces={'ns': 'http://www.lxistandard.org/InstrumentIdentification/1.0'}): | |
visa_resource = e.text.split('::') | |
interface_type = visa_resource[0] | |
if interface_type.startswith('TCPIP'): | |
address = visa_resource[1:-1] | |
if len(address) == 2 and int(address[1]) > 1024: | |
# This is most likely our SCPI address. | |
scpi_address = address[0] | |
scpi_port = int(address[1]) | |
break | |
return (scpi_address, scpi_port) | |
def test_scpi(scpi_connection_tuple): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | |
s.connect(scpi_connection_tuple) | |
s.send(b':TRIG:STAT?') | |
trig_status = s.recv(32) | |
if trig_status.strip() == b'STOP': | |
print('Starting acquisition now...') | |
s.send(b':RUN') | |
else: | |
print('Stoping acquisition now...') | |
s.send(b':STOP') | |
s.close() | |
def main(): | |
results = get_ds1000z_results() | |
for result in results: | |
Listener.pprint(**result) | |
print() | |
for result in results: | |
print('Trying to connect to {}...'.format(socket.inet_ntoa(result['zc_info'].address))) | |
scpi_connection = get_scpi_connection_tuple((socket.inet_ntoa(result['zc_info'].address), result['zc_info'].port)) | |
if scpi_connection is not (None, None): | |
test_scpi(scpi_connection) | |
print() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment