Advertisement
Guest User

Untitled

a guest
May 25th, 2019
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.52 KB | None | 0 0
  1. import json
  2. import logging
  3. import threading
  4. import fibre
  5. from fibre.utils import Logger
  6.  
  7. logger = logging.getLogger(__name__)
  8. logger.setLevel(logging.DEBUG)
  9. fibre_logger = Logger(verbose=logger.getEffectiveLevel() == logging.DEBUG)
  10.  
  11.  
  12. class OdriveManager:
  13. _CONFIG_KEY = 'config'
  14. _CRC_KEY = 'crc16'
  15.  
  16. def __init__(self, serial_number, cache_file=None):
  17. self.channel = None
  18. self.serial_number = serial_number
  19. self.cache_file = cache_file or 'odrive-cache-{serial_number}.json' \
  20. .format(serial_number=serial_number)
  21.  
  22. def setup_odrive(self, config, crc):
  23. """
  24. Set up fibre remote object using given configuration.
  25. """
  26.  
  27. json_data = {"name": "fibre_node", "members": config}
  28. self.channel._interface_definition_crc = crc
  29.  
  30. # Just some black magic
  31. # Nothing to see here
  32. logger.debug('Making fibre remote object connection.')
  33. obj = fibre.remote_object.RemoteObject(
  34. json_data,
  35. None,
  36. self.channel,
  37. fibre_logger,
  38. )
  39. obj.__dict__['_json_data'] = json_data['members']
  40. obj.__dict__['_json_crc'] = crc
  41.  
  42. return obj
  43.  
  44. def read_config_from_device(self):
  45. """
  46. Read fibre connection configuration from odrive device directly, then
  47. cache it in a JSON file.
  48. """
  49.  
  50. json_bytes = self.channel.remote_endpoint_read_buffer(0)
  51.  
  52. try:
  53. json_string = json_bytes.decode('ascii')
  54. except UnicodeDecodeError as e:
  55. logger.warning('Loaded JSON bytes not valid ascii: %s.', str(e))
  56. return None
  57.  
  58. logger.debug('Calculating crc for JSON string.')
  59. crc = fibre.protocol.calc_crc16(
  60. fibre.protocol.PROTOCOL_VERSION,
  61. json_bytes,
  62. )
  63.  
  64. try:
  65. config = json.loads(json_string)
  66. except json.decoder.JSONDecodeError as e:
  67. logger.warning(
  68. 'Loaded JSON string from odrive invalid: %s.',
  69. str(e),
  70. )
  71. return None
  72.  
  73. od = self.setup_odrive(config=config, crc=crc)
  74.  
  75. logger.info('Caching JSON data in file.')
  76. with open(self.cache_file, 'w') as f:
  77. json.dump({self._CONFIG_KEY: config, self._CRC_KEY: crc}, f)
  78.  
  79. return od
  80.  
  81. def find_odrive(self):
  82. """
  83. Find odrive device and set up fibre connection.
  84. """
  85.  
  86. cancellation_token = threading.Event()
  87.  
  88. def callback(channel):
  89. self.channel = channel
  90. cancellation_token.set()
  91.  
  92. fibre.usbbulk_transport.discover_channels(
  93. path=None,
  94. callback=callback,
  95. logger=fibre_logger,
  96. serial_number=self.serial_number,
  97. cancellation_token=cancellation_token,
  98. channel_termination_token=None,
  99. )
  100.  
  101. try:
  102. with open(self.cache_file, 'r') as f:
  103. data = json.load(f)
  104. config = data[self._CONFIG_KEY]
  105. crc = data[self._CRC_KEY]
  106.  
  107. # TODO: Handle bad checksum
  108. od = self.setup_odrive(config=config, crc=crc)
  109. return od
  110.  
  111. except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError) as e:
  112. logger.warning('Could not read cached odrive configuration (%s). '
  113. 'Reading from device.', str(e))
  114.  
  115. od = self.read_config_from_device()
  116. return od
  117.  
  118.  
  119. if __name__ == '__main__':
  120. od = OdriveManager(serial_number='20763382304B').find_odrive()
  121.  
  122. print(od)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement