Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import usb.core
- import usb.util
- import sys
- from array import *
- from time import sleep
- # look for a fitbit flex, zip base station
- dev = usb.core.find(idVendor=0x2687, idProduct=0xfb01)
- if dev is None:
- raise ValueError('Couldn\'t find a Flex/Zip/One base station')
- print('\n--=== Found a Fitbit Flex/Zip/One base station ===--\n')
- sys.stdout.write('Device class: ' + str(dev.bDeviceClass) + '\n')
- cfg = dev.get_active_configuration();
- # first interface is for passing data
- intfData = cfg[(0,0)]
- if dev.is_kernel_driver_active(0) is True:
- print('we need to detach kernel driver for the Data interface')
- try:
- # unload the kernel driver
- dev.detach_kernel_driver(0)
- except usb.core.USBError as err:
- sys.exit("Could not detach the kernel driver for Data interface: %s" % str(err))
- # claim the interface
- # usb.util.claim_interface(0)
- if dev.is_kernel_driver_active(1) is True:
- print('we need to detach kernel driver for the Ctrl interface')
- try:
- # unload the kernel driver
- dev.detach_kernel_driver(1)
- except usb.core.USBError as err:
- sys.exit("Could not detach the kernel driver for Ctrl interface: %s" % str(err))
- # claim the interface
- # usb.util.claim_interface(1)
- dev.set_configuration();
- # second interface is for control
- intfCtrl = cfg[(1,0)]
- # unload the kernel driver
- # get the outgoing data endpoint (0x01) and incoming data ep (0x81)
- ep81DataIn = intfData[0]
- ep01DataOut = intfData[1]
- # get the outgoing control endpoint (0x02) and incoming ep (0x82)
- ep82CtrlIn = intfData[0]
- ep02CtrlOut = intfData[1]
- START_DISC = array('B',[0x1a,0x04,0xba,0x56,0x89,0xa6,0xfa,0xbf, \
- 0xa2,0xbd,0x01,0x46,0x7d,0x6e,0x00,0x00, \
- 0xab,0xad,0x00,0xfb,0x01,0xfb,0x02,0xfb, \
- 0xa0,0x0f,0x00,0xd3,0x00,0x00,0x00,0x00])
- CANCEL_DISC = array('B',[0x02,0x05,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00])
- # NOTE: My tracker ID is hardcoded in this command!
- # || || || || || ||
- ESTAB_LINK = array('B',[0x0B,0x06,0x4c,0x52,0x5b,0xf9,0xab,0xc8, \
- 0x01,0x00,0xfb,0x1b,0x00,0x00,0x00,0x07, \
- 0x00,0x00,0x00,0x71,0x00,0x00,0x00,0x02, \
- 0x00,0x00,0x00,0xee,0x00,0x00,0x00,0x00])
- ENABLE_TX_PIPE = array('B',[0x03,0x08,0x01,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00])
- DISABLE_TX_PIPE = array('B',[0x03,0x08,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00])
- INIT_AIRLINK = array('B',[0xc0,0x0a,0x0a,0x00,0x06,0x00,0x06,0x00, \
- 0x00,0x00,0xc8,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x0c])
- START_MEGADUMP = array('B',[0xc0,0x10,0x0d,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x03])
- TERM_AIRLINK = array('B',[0x02,0x07,0x00,0x00,0x00,0x00,0x06,0x00, \
- 0x00,0x00,0xc8,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00, \
- 0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00])
- xbuf = array('B',[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0])
- def printhex( data ):
- r = ''
- for num in ret:
- r += '%02X' % num
- r += ' '
- print( r )
- # -------------------------------------------------------------
- # Step 1, cancel discovery & terminate link (force disconnect)
- # -------------------------------------------------------------
- print ('\nCancel discovery and terminate link (reset)...')
- xbuf[0] = 0x02
- xbuf[1] = 0x02
- xbuf[2] = 0x00
- dev.write(0x02, xbuf, intfCtrl.bInterfaceNumber, 32)
- # expect: 0x20, 0x01, "CancelDiscovery" within ~2-5ms
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on cancel discovery & terminate handshake (1)")
- printhex( ret )
- # expect 0x20, 0x01, "TerminateLink" within ~2ms
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on cancel discovery handshake (2)")
- printhex( ret )
- # # expect 0x03, 0x02, 0x00, terminate link acknowledgement
- # however, it doesn't matter if not received. FBC doesn't care either.
- # try:
- # ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 500)
- # except:
- # sys.exit("Timeout on cancel discovery handshake (3)")
- # printhex( ret )
- # -------------------------------------------------------------
- # Step 4, get dongle (base station) info
- # not really necessary to do a sync(?)
- # -------------------------------------------------------------
- print ('\nGet dongle info...')
- # ??? wait ~ 500ms before issuing this command?
- sleep(0.500)
- xbuf[0] = 0x02
- xbuf[1] = 0x01
- xbuf[2] = 0x00
- dev.write(0x02, xbuf, intfCtrl.bInterfaceNumber, 32)
- # expect reply with dongle info within ~2ms
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on get dongle info handshake")
- printhex( ret )
- # -------------------------------------------------------------
- # Step 6, start discovery, wait up to 2 seconds for a response
- # -------------------------------------------------------------
- print ('\nStart discovery...')
- # send 0x1a, 0x04...
- dev.write(0x02, START_DISC, intfCtrl.bInterfaceNumber, 32)
- # expect: 0x20, 0x01, "StartDiscovery"
- # about 4 ms after prev command
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on start discovery handshake")
- printhex( ret )
- # TODO: if no response, cancel discovery and repeat
- #
- print ('\nWaiting for discovery...')
- # NOTE: FitbitConnect program only waits about 2000 ms before
- # giving up and starting over.
- # expect: 0x13, 0x03 plus tracker ID plus RSSI plus other unknown stuff
- # FitbitConnect doesn't seem to accept less than about
- # -80 dBm for RSSI.
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 10000)
- except:
- sys.exit("No answer on discovery - tracker not found.")
- printhex( ret )
- # expect: 0x03, 0x02, 0x01
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 10000)
- except:
- sys.exit("No ACK discovery - tracker found but what went wrong?")
- printhex( ret )
- # extract tracker ID from data, same as NFC ID
- # TODO:
- # if response received, get tracker ID and feed it back to dongle with
- # EstablishLink command
- # -------------------------------------------------------------
- # Step 9, cancel discovery in prep for connecting to tracker
- # -------------------------------------------------------------
- print ('\nGot a tracker, cancel discovery...')
- # send 0x02, 0x05
- dev.write(0x02, CANCEL_DISC, intfCtrl.bInterfaceNumber, 32)
- # expect: 0x03, 0x02, 0x01
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on cancel discovery handshake")
- printhex( ret )
- sleep(2.000)
- # expect: 0x20, 0x01, "CancelDiscovery"
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on cancel discovery handshake")
- printhex( ret )
- # -------------------------------------------------------------
- # Step 11, establish link, wait up to 2 seconds for a response
- # -------------------------------------------------------------
- print ('\nEstablishing link...')
- # TODO: use tracker ID from discovery in this pkt, not hardcoded!
- # send 0x0b, 0x06...
- dev.write(0x02, ESTAB_LINK, intfCtrl.bInterfaceNumber, 32)
- # expect: 0x20, 0x01, "EstablishLink" on read
- # dongle ACKing the request to link to tracker
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 5000)
- except:
- sys.exit("Timeout on establish link handshake (1)")
- printhex( ret )
- # expect: 0x03, 0x04 on read
- # dongle indicating link made?
- # about 400 ms after previous recv pkt
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 5000)
- except:
- sys.exit("Timeout on establish link handshake (2)")
- printhex( ret )
- # expect: 0x20, 0x01, "GAP_LINK_ESTABLISHED_EVENT" on read
- # dongle indicating link made.
- # about 1 ms after previous recv pkt
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 15000)
- except:
- sys.exit("Timeout on establish link handshake (3)")
- printhex( ret )
- # expect: 0x02, 0x07 on read
- # dongle indicating what?
- # about 750 to 1600 ms after previous recv pkt
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 15000)
- except:
- # sys.exit("Timeout on establish link handshake (4)")
- print("Timeout on establish link handshake (4), proceeding anyway")
- printhex( ret )
- # -------------------------------------------------------------
- # Step 16, enable TX pipe, wait up to 2 seconds for a response
- # -------------------------------------------------------------
- print ('\nEnable TX pipe...')
- # send 0x03, 0x08, 0x01
- dev.write(0x02, ENABLE_TX_PIPE, intfCtrl.bInterfaceNumber, 32)
- # note response is from data in endpoint
- # expect: 0xc0, 0x0b on read
- # about 300 ms after previous command
- #
- try:
- ret = dev.read (0x81, 32, intfData.bInterfaceNumber, 5000)
- except:
- # sys.exit("Timeout on enable TX pipe handshake")
- print( "Hrmmm, didn't get expected C00B from the data intf, proceeding")
- printhex( ret )
- # -------------------------------------------------------------
- # Step 18, initialize airlink, wait up to 5 seconds for a response
- # -------------------------------------------------------------
- print ('\nInit airlink...')
- # send 0xc0, 0x0a, 0x0a ...
- dev.write(0x01, INIT_AIRLINK, intfData.bInterfaceNumber, 32)
- # TX pipe open/airlink up
- # expect: 0x08, 0x06 on read
- # about 3000 ms after prev command on data intf
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 10000)
- except:
- sys.exit("Timeout on init airlink handshake (1)")
- printhex( ret )
- # airlink up, read tracker info
- # @#@#@# this is where it's hanging up...
- # expect: 0xc0, 0x14 plus tracker ID on read
- # about 100 ms after prev recv pkt on ctrl intf
- try:
- ret = dev.read (0x81, 32, intfData.bInterfaceNumber, 10000)
- except:
- sys.exit("Timeout on init airlink handshake (2)")
- printhex( ret )
- # -------------------------------------------------------------
- # Step 21, begin megadump
- # -------------------------------------------------------------
- print ('\nBegin megadump...')
- # send 0xc0, 0x10, 0x0d
- dev.write(0x01, START_MEGADUMP, intfData.bInterfaceNumber, 32)
- # Read expected ACK to start of megadump
- try:
- ret = dev.read (0x81, 32, intfData.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on begin megadump handshake")
- printhex( ret )
- # expect: 0xc0 0x41 0x0d
- try:
- ret = dev.read (0x81, 32, intfData.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on megadump header")
- printhex( ret )
- # Read expected header of megadump (starts with 0x28 0x02
- try:
- ret = dev.read (0x81, 32, intfData.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on megadump header")
- printhex( ret )
- # -------------------------------------------------------------
- # read megadump in a loop, stop when datalen < 20
- # -------------------------------------------------------------
- while True:
- try:
- ret = dev.read (0x81, 32, intfData.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on megadump data")
- printhex( ret )
- if ret[31] < 20:
- break
- # step 25
- # read megadump footer (0xc0 0x42 ...)
- try:
- ret = dev.read (0x81, 32, intfData.bInterfaceNumber, 500)
- except:
- sys.exit("Timeout on megadump footer")
- printhex( ret )
- # -------------------------------------------------------------
- # step 28, start server data dump to tracker
- # -------------------------------------------------------------
- # @#@#@# not implemented
- # -------------------------------------------------------------
- # Step 37, disable TX pipe
- # -------------------------------------------------------------
- print ('\nDisable TX pipe...')
- # send 0x03, 0x08, 0x00
- dev.write(0x02, DISABLE_TX_PIPE, intfCtrl.bInterfaceNumber, 32)
- # note response is from data in endpoint
- # expect: 0xc0 0x0b
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 5000)
- except:
- sys.exit("Timeout on disable TX pipe handshake")
- printhex( ret )
- # -------------------------------------------------------------
- # Step 39, terminate link
- # -------------------------------------------------------------
- print ('\nTerminate airlink...')
- # send 0x02, 0x07
- dev.write(0x02, TERM_AIRLINK, intfCtrl.bInterfaceNumber, 32)
- # expect: 0x20, 0x01, "TerminateLink"
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 5000)
- except:
- sys.exit("Timeout on terminate airlink handshake (1)")
- printhex( ret )
- # expect: 0x03, 0x05, 0x16
- try:
- ret = dev.read (0x82, 32, intfCtrl.bInterfaceNumber, 5000)
- except:
- sys.exit("Timeout on terminate airlink handshake (2)")
- printhex( ret )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement