Last active
June 17, 2022 16:28
-
-
Save SeanPesce/d2d788ca426f24d2c88848f32cd4a21c to your computer and use it in GitHub Desktop.
Python 3 classes for USB bulk device I/O
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Author: Sean Pesce | |
# Installing prerequisites: | |
# sudo pip3 install pyusb | |
# | |
# On Windows, you also need to install libusb: | |
# https://sourceforge.net/projects/libusb-win32/files/libusb-win32-releases/ | |
# Then, use inf-wizard.exe to create and install a libusb driver for the device. | |
# Note: this requires installation of an unsigned driver. | |
# On Linux, this script must be run as root. | |
# References: | |
# https://github.com/pyusb/pyusb/blob/master/docs/tutorial.rst | |
import usb.core | |
import usb.util | |
class USB_Device: | |
def __init__(self, vendor_id, product_id, name='USB Device', config_id=None): | |
self.vendor_id = vendor_id | |
self.product_id = product_id | |
self.product_name = name | |
self.dev = usb.core.find(idVendor=vendor_id, idProduct=product_id) | |
assert self.dev is not None, f'Failed to find {self.product_name}' | |
self.dev.reset() | |
# Set the active configuration. With no arguments, the first configuration will be the active one | |
if config_id is None: | |
self.dev.set_configuration() | |
else: | |
self.dev.set_configuration(config_id) | |
self.cfg = self.dev.get_active_configuration() | |
self.iface = self.cfg[(0,0)] | |
def reset(self): | |
return self.dev.reset() | |
class USB_Bulk_Device(USB_Device): | |
def __init__(self, vendor_id, product_id, name='USB Bulk Device', config_id=None): | |
super().__init__(vendor_id, product_id, name, config_id) | |
self.in_endp = usb.util.find_descriptor( | |
self.iface, | |
# Match the first IN endpoint | |
custom_match = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN) | |
assert self.in_endp is not None, 'Failed to find IN endpoint' | |
self.out_endp = usb.util.find_descriptor( | |
self.iface, | |
# Match the first OUT endpoint | |
custom_match = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT) | |
assert self.out_endp is not None, 'Failed to find OUT endpoint' | |
def clear_halt(self): | |
self.dev.clear_halt(self.out_endp) | |
self.dev.clear_halt(self.in_endp) | |
def write(self, data, encoding='utf8'): | |
if type(data) == str: | |
data = data.encode(encoding) | |
data = bytearray(data) | |
written = 0 # Total bytes written | |
wrote_once = False # Flag to allow writing an empty message | |
while data or not wrote_once: | |
nwrite = self.dev.write(self.out_endp.bEndpointAddress, data[:self.out_endp.wMaxPacketSize]) | |
data = data[nwrite:] | |
wrote_once = True | |
written += nwrite | |
return written | |
def read(self, size, timeout=None): | |
""" | |
Attempts to read the specified number of bytes from the USB device. This function does NOT | |
guarantee that the specified number of bytes will be returned. Multiple reads are performed | |
ONLY IF the read size is greater than the maximum packet size of the endpoint. | |
""" | |
data = bytearray() | |
read_more = True | |
while read_more: | |
remaining = size - len(data) | |
if remaining > self.in_endp.wMaxPacketSize: | |
read_more = True | |
else: | |
read_more = False | |
read_data = self.dev.read(self.in_endp.bEndpointAddress, min(self.in_endp.wMaxPacketSize, remaining), timeout) | |
data += read_data | |
return data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment