Advertisement
Guest User

Untitled

a guest
Sep 9th, 2020
502
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.31 KB | None | 0 0
  1. #
  2. #waage.sh
  3. #
  4. #!/bin/bash
  5. python3 assemble_mandata.py $1 $2 > mdata
  6. cat mdata
  7. killall python3
  8. python3 example-advertisement.py
  9.  
  10.  
  11. #
  12. #extract_to_int.py:
  13. #
  14. #!/usr/bin/env python3
  15. import sys
  16.  
  17. def main():
  18.     offset=int(sys.argv[1])
  19.     if len(sys.argv) < 3:
  20.        to=offset+16
  21.     else:
  22.        to=offset+int(sys.argv[2])
  23.     hex_a="c8a0a0"
  24.     bin_a=to_binary(hex_a)
  25.     sub_str=bin_a[offset:to]
  26.     print(int(sub_str, 2))
  27.     print(hex(int(sub_str,2)))
  28.    
  29.  
  30. def to_binary(value):
  31.     int_val = int("0x"+value,16)
  32.     binry = str(bin(int_val))[2:]
  33.     return binry
  34.    
  35. main()
  36.  
  37. #
  38. #assemble_mandata.py
  39. #
  40. #!/usr/bin/env python3
  41. import sys
  42.  
  43. def main():
  44.   offset=int(sys.argv[1])
  45.   hex_a="c8a0a0"
  46.   bin_a=to_binary(hex_a)
  47.   bin_b=to_binary(sys.argv[2])
  48.   str_a=str(bin_a)
  49.   str_b=add_zero(str(bin_b), len(sys.argv[2]))
  50.   length=len(str_b)
  51.   out=str_a[0:offset]+str_b+str_a[offset+length:]
  52.   out_int = int(out, 2)
  53.   print(str(hex(out_int))[2:])
  54.  
  55. def to_binary(value):
  56.   int_val = int("0x"+value,16)
  57.   binry = str(bin(int_val))[2:]
  58.   return binry
  59.  
  60. def add_zero(txt, length):
  61.   length=length*4
  62.   length_txt=len(txt)
  63.   y=length-length_txt
  64.   for x in range(y):
  65.     txt="0"+txt
  66.   return txt
  67.  
  68.  
  69. main()
  70.  
  71. #
  72. #example-advertisement.py
  73. #
  74. #!/usr/bin/python
  75.  
  76. from __future__ import print_function
  77.  
  78. import argparse
  79. import dbus
  80. import dbus.exceptions
  81. import dbus.mainloop.glib
  82. import dbus.service
  83. import time
  84. import threading
  85.  
  86.  
  87. try:
  88.     from gi.repository import GObject  # python3
  89. except ImportError:
  90.     import gobject as GObject  # python2
  91.  
  92. mainloop = None
  93.  
  94. BLUEZ_SERVICE_NAME = 'org.bluez'
  95. LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
  96. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  97. DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
  98.  
  99. LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
  100.  
  101.  
  102. class InvalidArgsException(dbus.exceptions.DBusException):
  103.     _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
  104.  
  105.  
  106. class NotSupportedException(dbus.exceptions.DBusException):
  107.     _dbus_error_name = 'org.bluez.Error.NotSupported'
  108.  
  109.  
  110. class NotPermittedException(dbus.exceptions.DBusException):
  111.     _dbus_error_name = 'org.bluez.Error.NotPermitted'
  112.  
  113.  
  114. class InvalidValueLengthException(dbus.exceptions.DBusException):
  115.     _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
  116.  
  117.  
  118. class FailedException(dbus.exceptions.DBusException):
  119.     _dbus_error_name = 'org.bluez.Error.Failed'
  120.  
  121.  
  122. class Advertisement(dbus.service.Object):
  123.     PATH_BASE = '/org/bluez/example/advertisement'
  124.  
  125.     def __init__(self, bus, index, advertising_type):
  126.         f=open("mdata", "r")
  127.         self.md="a0"+str(f.read())[:-1]
  128.         #self.md="a0 c9 d0a0"
  129.         print(self.md)
  130.         self.path = self.PATH_BASE + str(index)
  131.         self.bus = bus
  132.         self.ad_type = advertising_type
  133.         self.service_uuids = None
  134.         self.manufacturer_data = None
  135.         self.solicit_uuids = None
  136.         self.service_data = None
  137.         self.local_name = None
  138.         self.include_tx_power = None
  139.         self.data = None
  140.         dbus.service.Object.__init__(self, bus, self.path)
  141.        
  142.     def calc_mandata(self):
  143.         bArr = []
  144.         md = self.md
  145.         md = md.replace(" ", "")
  146.         md = md+"0d"
  147.         last_val=0
  148.         for x in range(0, 10, 2):
  149.             hex_as_int=int('0x'+md[x:x+2], 16)
  150.             last_val=last_val+hex_as_int
  151.             bArr.append(hex_as_int)
  152.         checksum = str(hex(last_val))[-2:]
  153.         checksum = int('0x'+checksum,16)
  154.         bArr.append(checksum)
  155.         return bArr
  156.    
  157.     def print_arr(self, bArr):
  158.         out=""
  159.         for x in bArr:
  160.             strng = str(hex(x)).replace("0x", "")
  161.             if len(strng) == 1:
  162.                 strng = "0"+strng
  163.             out=out+" "+strng
  164.         print(out)
  165.  
  166.     def get_properties(self):
  167.         properties = dict()
  168.         properties['Type'] = self.ad_type
  169.         if self.service_uuids is not None:
  170.             properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
  171.                                                     signature='s')
  172.         if self.solicit_uuids is not None:
  173.             properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
  174.                                                     signature='s')
  175.         if self.manufacturer_data is not None:
  176.             properties['ManufacturerData'] = dbus.Dictionary(
  177.                 self.manufacturer_data, signature='qv')
  178.         if self.service_data is not None:
  179.             properties['ServiceData'] = dbus.Dictionary(self.service_data,
  180.                                                         signature='sv')
  181.         if self.local_name is not None:
  182.             properties['LocalName'] = dbus.String(self.local_name)
  183.         if self.include_tx_power is not None:
  184.             properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)
  185.  
  186.         if self.data is not None:
  187.             properties['Data'] = dbus.Dictionary(
  188.                 self.data, signature='yv')
  189.         return {LE_ADVERTISEMENT_IFACE: properties}
  190.  
  191.     def get_path(self):
  192.         return dbus.ObjectPath(self.path)
  193.  
  194.     def add_service_uuid(self, uuid):
  195.         if not self.service_uuids:
  196.             self.service_uuids = []
  197.         self.service_uuids.append(uuid)
  198.  
  199.     def add_solicit_uuid(self, uuid):
  200.         if not self.solicit_uuids:
  201.             self.solicit_uuids = []
  202.         self.solicit_uuids.append(uuid)
  203.  
  204.     def add_manufacturer_data(self, manuf_code, data):
  205.         if not self.manufacturer_data:
  206.             self.manufacturer_data = dbus.Dictionary({}, signature='qv')
  207.         self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')
  208.  
  209.     def add_local_name(self, name):
  210.         if not self.local_name:
  211.             self.local_name = ""
  212.         self.local_name = dbus.String(name)
  213.  
  214.     def add_data(self, ad_type, data):
  215.         if not self.data:
  216.             self.data = dbus.Dictionary({}, signature='yv')
  217.         self.data[ad_type] = dbus.Array(data, signature='y')
  218.  
  219.     @dbus.service.method(DBUS_PROP_IFACE,
  220.                          in_signature='s',
  221.                          out_signature='a{sv}')
  222.     def GetAll(self, interface):
  223.         print('GetAll')
  224.         if interface != LE_ADVERTISEMENT_IFACE:
  225.             raise InvalidArgsException()
  226.         print('returning props')
  227.         return self.get_properties()[LE_ADVERTISEMENT_IFACE]
  228.  
  229.     @dbus.service.method(LE_ADVERTISEMENT_IFACE,
  230.                          in_signature='',
  231.                          out_signature='')
  232.     def Release(self):
  233.         print('%s: Released!' % self.path)
  234.  
  235.  
  236. class TestAdvertisement(Advertisement):
  237.    
  238.        
  239.     def __init__(self, bus, index):
  240.         Advertisement.__init__(self, bus, index, 'peripheral')
  241.         bArr = [0x3a, 0x05, 0xc7, 0xeb, 0x27, 0xb8]
  242.         bArr2 = self.calc_mandata()
  243.         self.print_arr(bArr2)
  244.         bArr.extend(bArr2)
  245.         self.print_arr(bArr)
  246.         self.add_service_uuid('FFB0')
  247.         self.add_manufacturer_data(0xa0ac, bArr)
  248.         self.add_local_name('AAA006')
  249.         self.include_tx_power = True
  250.  
  251.  
  252. def register_ad_cb():
  253.     print('Advertisement registered')
  254.  
  255.  
  256. def register_ad_error_cb(error):
  257.     print('Failed to register advertisement: ' + str(error))
  258.     mainloop.quit()
  259.  
  260.  
  261. def find_adapter(bus):
  262.     remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
  263.                                DBUS_OM_IFACE)
  264.     objects = remote_om.GetManagedObjects()
  265.  
  266.     for o, props in objects.items():
  267.         if LE_ADVERTISING_MANAGER_IFACE in props:
  268.             return o
  269.  
  270.     return None
  271.  
  272.  
  273. def shutdown(timeout):
  274.     print('Advertising for {} seconds...'.format(timeout))
  275.     time.sleep(timeout)
  276.     mainloop.quit()
  277.  
  278.  
  279. def main(timeout=0):
  280.     global mainloop
  281.  
  282.     dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  283.  
  284.     bus = dbus.SystemBus()
  285.  
  286.     adapter = find_adapter(bus)
  287.     if not adapter:
  288.         print('LEAdvertisingManager1 interface not found')
  289.         return
  290.  
  291.     adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
  292.                                    "org.freedesktop.DBus.Properties")
  293.  
  294.     adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
  295.  
  296.     ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
  297.                                 LE_ADVERTISING_MANAGER_IFACE)
  298.  
  299.     test_advertisement = TestAdvertisement(bus, 0)
  300.  
  301.     mainloop = GObject.MainLoop()
  302.  
  303.     ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
  304.                                      reply_handler=register_ad_cb,
  305.                                      error_handler=register_ad_error_cb)
  306.  
  307.     if timeout > 0:
  308.         threading.Thread(target=shutdown, args=(timeout,)).start()
  309.     else:
  310.         print('Advertising forever...')
  311.  
  312.     mainloop.run()  # blocks until mainloop.quit() is called
  313.  
  314.     ad_manager.UnregisterAdvertisement(test_advertisement)
  315.     print('Advertisement unregistered')
  316.     dbus.service.Object.remove_from_connection(test_advertisement)
  317.  
  318.  
  319. if __name__ == '__main__':
  320.     parser = argparse.ArgumentParser()
  321.     parser.add_argument('--timeout', default=0, type=int, help="advertise " +
  322.                         "for this many seconds then stop, 0=run forever " +
  323.                         "(default: 0)")
  324.     args = parser.parse_args()
  325.  
  326.     main(args.timeout)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement