-
-
Save IamPhytan/1572711baee2a1ba0d425da769f5afe8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import threading | |
import fibre | |
from fibre.utils import Logger | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
fibre_logger = Logger(verbose=logger.getEffectiveLevel() == logging.DEBUG) | |
class OdriveManager: | |
_CONFIG_KEY = 'config' | |
_CRC_KEY = 'crc16' | |
def __init__(self, serial_number, cache_file=None): | |
self.channel = None | |
self.serial_number = serial_number | |
self.cache_file = cache_file or 'odrive-cache-{serial_number}.json' \ | |
.format(serial_number=serial_number) | |
def setup_odrive(self, config, crc): | |
""" | |
Set up fibre remote object using given configuration. | |
""" | |
json_data = {"name": "fibre_node", "members": config} | |
self.channel._interface_definition_crc = crc | |
# Just some black magic | |
# Nothing to see here | |
logger.debug('Making fibre remote object connection.') | |
obj = fibre.remote_object.RemoteObject( | |
json_data, | |
None, | |
self.channel, | |
fibre_logger, | |
) | |
obj.__dict__['_json_data'] = json_data['members'] | |
obj.__dict__['_json_crc'] = crc | |
return obj | |
def read_config_from_device(self): | |
""" | |
Read fibre connection configuration from odrive device directly, then | |
cache it in a JSON file. | |
""" | |
json_bytes = self.channel.remote_endpoint_read_buffer(0) | |
try: | |
json_string = json_bytes.decode('ascii') | |
except UnicodeDecodeError as e: | |
logger.warning('Loaded JSON bytes not valid ascii: %s.', str(e)) | |
return None | |
logger.debug('Calculating crc for JSON string.') | |
crc = fibre.protocol.calc_crc16( | |
fibre.protocol.PROTOCOL_VERSION, | |
json_bytes, | |
) | |
try: | |
config = json.loads(json_string) | |
except json.decoder.JSONDecodeError as e: | |
logger.warning( | |
'Loaded JSON string from odrive invalid: %s.', | |
str(e), | |
) | |
return None | |
od = self.setup_odrive(config=config, crc=crc) | |
logger.info('Caching JSON data in file.') | |
with open(self.cache_file, 'w') as f: | |
json.dump({self._CONFIG_KEY: config, self._CRC_KEY: crc}, f) | |
return od | |
def find_odrive(self): | |
""" | |
Find odrive device and set up fibre connection. | |
""" | |
cancellation_token = threading.Event() | |
def callback(channel): | |
self.channel = channel | |
cancellation_token.set() | |
fibre.usbbulk_transport.discover_channels( | |
path=None, | |
callback=callback, | |
logger=fibre_logger, | |
serial_number=self.serial_number, | |
cancellation_token=cancellation_token, | |
channel_termination_token=None, | |
) | |
try: | |
with open(self.cache_file, 'r') as f: | |
data = json.load(f) | |
config = data[self._CONFIG_KEY] | |
crc = data[self._CRC_KEY] | |
# TODO: Handle bad checksum | |
od = self.setup_odrive(config=config, crc=crc) | |
return od | |
except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError) as e: | |
logger.warning('Could not read cached odrive configuration (%s). ' | |
'Reading from device.', str(e)) | |
od = self.read_config_from_device() | |
return od | |
if __name__ == '__main__': | |
od = OdriveManager(serial_number='20763382304B').find_odrive() | |
print(od) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment