Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # file: shining_mask.py
- # desc: several helper methods to discover Shining Masks over
- # Bluetooth BLE and send messages to change faces.
- #
- # requirements: pybluez, gattlib, pygatt
- #
- # references: The follwing references can be used to
- # set up your RPi BT BLE dev environment.
- #
- # Sniffing BT traffic:
- # http://nilhcem.com/iot/reverse-engineering-simple-bluetooth-devices
- #
- # RPi BT Setup Guides:
- # https://cdn-learn.adafruit.com/downloads/pdf/install-bluez-on-the-raspberry-pi.pdf
- # https://www.bluetooth.com/wp-content/uploads/2019/03/1908_Tutorial-How-to-set-up-BlueZ__UpdateLFC_FINAL.pdf
- import time
- import logging
- import binascii
- import pygatt
- import bluetooth
- from bluetooth.ble import DiscoveryService
- # `mask_codes.py` should contain a list of tuples
- # [(0x00000000, bytes.fromhex("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"), "name")...]
- import mask_codes
- # Configure with your values
- address = '' # Device MAC Address
- connect_timeout = 5 # Time in seconds to connect to device
- code_cycle_timeout = 2 # Time in seconds to cycle to next mask code
- handle_cycle_timeout = 2 # Time in seconds to cycle to next handle
- # Enable logging for pygatt
- logging.basicConfig()
- logging.getLogger('pygatt').setLevel(logging.DEBUG)
- # Initalize with all mask codes
- codes = mask_codes.codes()
- # Global variables
- adapter = None
- device = None
- subscriptions = []
- handles = [] # List of device characteristics
- handle_index = 0 # Index used to cycle through handles
- message_handle = "" # Handle where messages will be sent
- code_index = 0 # Index used to cycle through codes
- def init(address=address, timeout=connect_timeout):
- """
- Initialize and start BLE adapter then connect to device
- """
- # Initalize BLE adapter
- global adapter
- global device
- adapter = pygatt.GATTToolBackend()
- adapter.start()
- if address:
- device = connect(address, timeout)
- def handle_subscribe(handle, value):
- """
- Handles subscription data
- handle -- integer, characteristic read handle the data was received on
- value -- bytearray, the data returned in the notification
- """
- print("Received data: %s" % hexlify(value))
- def scan_ble_devices():
- """
- Scan for BLE devices
- The name may not appear during the scan.
- So it is best to run this first with the mask turned off,
- then turn it on and then see which MAC address is new.
- """
- print("Staring DiscoveryService")
- service = DiscoveryService()
- print("Discovering devices")
- devices = service.discover(2)
- print("Scanning for BLE devices")
- for addr, name in devices.items():
- try:
- print(" %s - %s" % (addr, name))
- except UnicodeEncodeError:
- print(" %s - %s" % (addr, name.encode('utf-8', 'replace')))
- def cycle_codes(mac, handle, sleep_time=code_cycle_timeout, index=code_index):
- """
- Cycle through mask bytecodes recursively.
- mac -- string, MAC address for device
- handle -- bytearray, characteristic handle receiver for message
- sleep_time -- int, how long to sleep before recursing
- index -- int, codes index
- """
- if not handle:
- raise ValueError("handle is undefined")
- try:
- print("Cycling to code at index %s in %ss on device %s" % (index, sleep_time, mac))
- for code in codes[index:]:
- print("Sleeping for %s seconds" % sleep_time)
- time.sleep(sleep_time)
- print("Pushing %s to Mask" % code[2])
- device.char_write_handle(code[0], code[1], wait_for_response=False)
- except pygatt.exceptions.NotificationTimeout:
- print("NotificationTimeout: cycling to code index %s" % index)
- if index < len(codes) - 1:
- index += 1
- cycle_codes(mac, handle, sleep_time, index)
- pass
- except pygatt.exceptions.NotConnectedError:
- print("NotConnectedError: cycling to code index %s" % index)
- if index < len(codes) - 1:
- index += 1
- cycle_codes(mac, handle, sleep_time, index)
- pass
- def cycle_handles(mac, handle, sleep_time=handle_cycle_timeout, index=handle_index):
- """
- Cycle through mask bytecodes recursively.
- mac -- string, MAC address for device
- handle -- bytearray, characteristic handle receiver for message
- sleep_time -- int, how long to sleep before recursing
- index -- int, codes index
- """
- try:
- print("Cycling to handle at index %s in %ss on device %s" % (index, sleep_time, mac))
- device.subscribe(handle,
- callback=handle_subscribe,
- indication=True)
- value = device.char_read(handle)
- time.sleep(sleep_time)
- subscriptions.append(handle)
- if index < len(handles) - 1:
- index += 1
- cycle_handles(mac, handles[index], sleep_time, index)
- else:
- list_subscriptions()
- update_message_handle()
- except pygatt.exceptions.NotificationTimeout:
- print("NotificationTimeout: cycling to handle index %s" % index)
- if index < len(handles) - 1:
- index += 1
- cycle_handles(mac, handles[index], sleep_time, index)
- else:
- print(*subscriptions, sep = "\n")
- update_message_handle()
- pass
- except pygatt.exceptions.NotConnectedError:
- print("NotConnectedError: cycling to handle index %s" % index)
- if index < len(handles) - 1:
- index += 1
- cycle_handles(mac, handles[index], sleep_time, index)
- else:
- list_subscriptions()
- update_message_handle()
- pass
- def list_subscriptions():
- print("Subscribed handles:")
- print(*subscriptions, sep = "\n")
- def update_message_handle():
- global message_handle
- if len(subscriptions) > 0:
- print("Assigning first subscription %s as message_handle" % subscriptions[0])
- message_handle = subscriptions[0]
- def get_characteristics():
- """
- Prints our device characteristics to identify
- handles where messages should be sent.
- raises:
- NameError: device undefined
- NameError: adapter undefined
- """
- if not device:
- raise NameError("device is undefined")
- if not adapter:
- raise NameError("adapter is undefined")
- print("Getting device characteristics")
- try:
- for uuid in device.discover_characteristics().keys():
- handles.append(uuid)
- print("Read UUID %s: %s" % (uuid, binascii.hexlify(device.char_read(uuid))))
- except pygatt.exceptions.NotificationTimeout:
- pass
- except pygatt.exceptions.NotConnectedError:
- pass
- def connect(mac, timeout=connect_timeout):
- """returns device, connection to device
- Connect to BLE device using MAC address
- mac -- string, MAC address of device
- """
- print("Connecting to %s, timeout=%s" % (mac, timeout))
- try:
- return adapter.connect(mac, timeout=timeout)
- except pygatt.exceptions.NotificationTimeout:
- stop()
- pass
- except pygatt.exceptions.NotConnectedError:
- stop()
- pass
- def stop():
- print("Stopping adapter")
- global adapter
- if adapter:
- adapter.stop()
- """
- Initialize adapater and device global variables
- """
- init()
- """
- Scan for BLE devices and find the mask's MAC address.
- Once found assign it to the address variable as a string
- Example:
- address = '00:00:00:00:00:00')
- """
- scan_ble_devices()
- if not address:
- stop()
- raise ValueError("address is undefined, assign the device's MAC address before continuing.")
- """
- Get a list of all characteristics (handles) on the device.
- Add the list of all characteristics string
- values (e.g. 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx')
- to the handles array above.
- You will use this later to search for the
- characteristic / handle where you can send
- messages that will update the mask.
- """
- get_characteristics()
- """
- Cycle through all handles and look for
- the first handle which does not timeout.
- This will be the handle where you can send mask codes.
- Assigns the first subscribed handle value to message_handle
- """
- cycle_handles(address, handles[handle_index])
- """
- Cycles through mask codes on device and first subscription handle.
- """
- cycle_codes(address, message_handle)
- stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement