Advertisement
Guest User

BLE led mask connector

a guest
Apr 14th, 2021
5,480
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.53 KB | None | 0 0
  1. # file: shining_mask.py
  2. # desc: several helper methods to discover Shining Masks over
  3. #       Bluetooth BLE and send messages to change faces.
  4. #
  5. # requirements: pybluez, gattlib, pygatt
  6. #
  7. # references: The follwing references can be used to
  8. #             set up your RPi BT BLE dev environment.
  9. #
  10. #   Sniffing BT traffic:
  11. #       http://nilhcem.com/iot/reverse-engineering-simple-bluetooth-devices
  12. #
  13. #   RPi BT Setup Guides:
  14. #       https://cdn-learn.adafruit.com/downloads/pdf/install-bluez-on-the-raspberry-pi.pdf
  15. #       https://www.bluetooth.com/wp-content/uploads/2019/03/1908_Tutorial-How-to-set-up-BlueZ__UpdateLFC_FINAL.pdf
  16.  
  17. import time
  18. import logging
  19. import binascii
  20. import pygatt
  21. import bluetooth
  22. from bluetooth.ble import DiscoveryService
  23.  
  24. # `mask_codes.py` should contain a list of tuples
  25. # [(0x00000000, bytes.fromhex("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"), "name")...]
  26. import mask_codes
  27.  
  28.  
  29. # Configure with your values
  30. address = '' # Device MAC Address
  31. connect_timeout = 5 # Time in seconds to connect to device
  32. code_cycle_timeout = 2 # Time in seconds to cycle to next mask code
  33. handle_cycle_timeout = 2 # Time in seconds to cycle to next handle
  34.  
  35. # Enable logging for pygatt
  36. logging.basicConfig()
  37. logging.getLogger('pygatt').setLevel(logging.DEBUG)
  38.  
  39. # Initalize with all mask codes
  40. codes = mask_codes.codes()
  41.  
  42. # Global variables
  43. adapter = None
  44. device = None
  45. subscriptions = []
  46. handles = [] # List of device characteristics
  47. handle_index = 0 # Index used to cycle through handles
  48. message_handle = "" # Handle where messages will be sent
  49. code_index = 0 # Index used to cycle through codes
  50.  
  51. def init(address=address, timeout=connect_timeout):
  52.     """
  53.    Initialize and start BLE adapter then connect to device
  54.    """
  55.     # Initalize BLE adapter
  56.     global adapter
  57.     global device
  58.  
  59.     adapter = pygatt.GATTToolBackend()
  60.     adapter.start()
  61.  
  62.     if address:
  63.         device = connect(address, timeout)
  64.  
  65. def handle_subscribe(handle, value):
  66.     """
  67.    Handles subscription data
  68.    
  69.    handle -- integer, characteristic read handle the data was received on
  70.    value -- bytearray, the data returned in the notification
  71.    """
  72.     print("Received data: %s" % hexlify(value))
  73.    
  74. def scan_ble_devices():
  75.     """
  76.    Scan for BLE devices
  77.    
  78.    The name may not appear during the scan.
  79.    
  80.    So it is best to run this first with the mask turned off,
  81.    then turn it on and then see which MAC address is new.
  82.    """
  83.    
  84.     print("Staring DiscoveryService")
  85.     service = DiscoveryService()
  86.    
  87.     print("Discovering devices")
  88.     devices = service.discover(2)
  89.    
  90.     print("Scanning for BLE devices")
  91.     for addr, name in devices.items():
  92.         try:
  93.             print("  %s - %s" % (addr, name))
  94.         except UnicodeEncodeError:
  95.             print("  %s - %s" % (addr, name.encode('utf-8', 'replace')))
  96.  
  97. def cycle_codes(mac, handle, sleep_time=code_cycle_timeout, index=code_index):
  98.     """
  99.    Cycle through mask bytecodes recursively.
  100.    
  101.    mac -- string, MAC address for device
  102.    handle -- bytearray, characteristic handle receiver for message
  103.    sleep_time -- int, how long to sleep before recursing
  104.    index -- int, codes index
  105.    """
  106.     if not handle:
  107.         raise ValueError("handle is undefined")
  108.        
  109.     try:
  110.         print("Cycling to code at index %s in %ss on device %s" % (index, sleep_time, mac))
  111.         for code in codes[index:]:
  112.             print("Sleeping for %s seconds" % sleep_time)
  113.             time.sleep(sleep_time)
  114.             print("Pushing %s to Mask" % code[2])
  115.             device.char_write_handle(code[0], code[1], wait_for_response=False)
  116.  
  117.     except pygatt.exceptions.NotificationTimeout:
  118.         print("NotificationTimeout: cycling to code index %s" % index)
  119.         if index < len(codes) - 1:
  120.             index += 1
  121.             cycle_codes(mac, handle, sleep_time, index)
  122.         pass
  123.     except pygatt.exceptions.NotConnectedError:
  124.         print("NotConnectedError: cycling to code index %s" % index)
  125.         if index < len(codes) - 1:
  126.             index += 1
  127.             cycle_codes(mac, handle, sleep_time, index)
  128.         pass
  129.  
  130. def cycle_handles(mac, handle, sleep_time=handle_cycle_timeout, index=handle_index):
  131.     """
  132.    Cycle through mask bytecodes recursively.
  133.    
  134.    mac -- string, MAC address for device
  135.    handle -- bytearray, characteristic handle receiver for message
  136.    sleep_time -- int, how long to sleep before recursing
  137.    index -- int, codes index
  138.    """
  139.     try:
  140.         print("Cycling to handle at index %s in %ss on device %s" % (index, sleep_time, mac))
  141.         device.subscribe(handle,
  142.                         callback=handle_subscribe,
  143.                         indication=True)
  144.         value = device.char_read(handle)
  145.         time.sleep(sleep_time)
  146.         subscriptions.append(handle)
  147.         if index < len(handles) - 1:
  148.             index += 1
  149.             cycle_handles(mac, handles[index], sleep_time, index)
  150.         else:
  151.             list_subscriptions()
  152.             update_message_handle()
  153.            
  154.     except pygatt.exceptions.NotificationTimeout:
  155.         print("NotificationTimeout: cycling to handle index %s" % index)
  156.         if index < len(handles) - 1:
  157.             index += 1
  158.             cycle_handles(mac, handles[index], sleep_time, index)
  159.         else:
  160.             print(*subscriptions, sep = "\n")
  161.             update_message_handle()
  162.         pass
  163.        
  164.     except pygatt.exceptions.NotConnectedError:
  165.         print("NotConnectedError: cycling to handle index %s" % index)
  166.         if index < len(handles) - 1:
  167.             index += 1
  168.             cycle_handles(mac, handles[index], sleep_time, index)
  169.         else:
  170.             list_subscriptions()
  171.             update_message_handle()
  172.         pass
  173.  
  174. def list_subscriptions():
  175.     print("Subscribed handles:")
  176.     print(*subscriptions, sep = "\n")
  177.  
  178. def update_message_handle():
  179.     global message_handle
  180.     if len(subscriptions) > 0:
  181.         print("Assigning first subscription %s as message_handle" % subscriptions[0])
  182.         message_handle = subscriptions[0]
  183.  
  184. def get_characteristics():
  185.     """
  186.    Prints our device characteristics to identify
  187.    handles where messages should be sent.
  188.    
  189.    raises:
  190.        NameError: device undefined
  191.        NameError: adapter undefined
  192.    """
  193.     if not device:
  194.         raise NameError("device is undefined")
  195.  
  196.     if not adapter:
  197.         raise NameError("adapter is undefined")
  198.        
  199.     print("Getting device characteristics")
  200.     try:
  201.         for uuid in device.discover_characteristics().keys():
  202.             handles.append(uuid)
  203.             print("Read UUID %s: %s" % (uuid, binascii.hexlify(device.char_read(uuid))))
  204.     except pygatt.exceptions.NotificationTimeout:
  205.         pass
  206.     except pygatt.exceptions.NotConnectedError:
  207.         pass
  208.  
  209.            
  210. def connect(mac, timeout=connect_timeout):
  211.     """returns device, connection to device
  212.    
  213.    Connect to BLE device using MAC address
  214.    
  215.    mac -- string, MAC address of device
  216.    
  217.    """
  218.        
  219.     print("Connecting to %s, timeout=%s" % (mac, timeout))
  220.     try:
  221.         return adapter.connect(mac, timeout=timeout)
  222.     except pygatt.exceptions.NotificationTimeout:
  223.         stop()
  224.         pass
  225.     except pygatt.exceptions.NotConnectedError:
  226.         stop()
  227.         pass
  228.  
  229. def stop():
  230.     print("Stopping adapter")
  231.     global adapter
  232.     if adapter:
  233.         adapter.stop()
  234.    
  235. """
  236. Initialize adapater and device global variables
  237. """
  238. init()
  239.  
  240. """
  241. Scan for BLE devices and find the mask's MAC address.
  242.  
  243. Once found assign it to the address variable as a string
  244.  
  245. Example:
  246.    address = '00:00:00:00:00:00')
  247. """
  248. scan_ble_devices()
  249.  
  250. if not address:
  251.     stop()
  252.     raise ValueError("address is undefined, assign the device's MAC address before continuing.")
  253. """
  254. Get a list of all characteristics (handles) on the device.
  255.  
  256. Add the list of all characteristics string
  257. values (e.g. 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx')
  258. to the handles array above.
  259.  
  260. You will use this later to search for the
  261. characteristic / handle where you can send
  262. messages that will update the mask.
  263. """
  264. get_characteristics()
  265.  
  266. """
  267. Cycle through all handles and look for
  268. the first handle which does not timeout.
  269.  
  270. This will be the handle where you can send mask codes.
  271.  
  272. Assigns the first subscribed handle value to message_handle
  273. """
  274. cycle_handles(address, handles[handle_index])
  275.  
  276. """
  277. Cycles through mask codes on device and first subscription handle.
  278. """
  279. cycle_codes(address, message_handle)
  280.  
  281. stop()
  282.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement