Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #
- #waage.sh
- #
- #!/bin/bash
- python3 assemble_mandata.py $1 $2 > mdata
- cat mdata
- killall python3
- python3 example-advertisement.py
- #
- #extract_to_int.py:
- #
- #!/usr/bin/env python3
- import sys
- def main():
- offset=int(sys.argv[1])
- if len(sys.argv) < 3:
- to=offset+16
- else:
- to=offset+int(sys.argv[2])
- hex_a="c8a0a0"
- bin_a=to_binary(hex_a)
- sub_str=bin_a[offset:to]
- print(int(sub_str, 2))
- print(hex(int(sub_str,2)))
- def to_binary(value):
- int_val = int("0x"+value,16)
- binry = str(bin(int_val))[2:]
- return binry
- main()
- #
- #assemble_mandata.py
- #
- #!/usr/bin/env python3
- import sys
- def main():
- offset=int(sys.argv[1])
- hex_a="c8a0a0"
- bin_a=to_binary(hex_a)
- bin_b=to_binary(sys.argv[2])
- str_a=str(bin_a)
- str_b=add_zero(str(bin_b), len(sys.argv[2]))
- length=len(str_b)
- out=str_a[0:offset]+str_b+str_a[offset+length:]
- out_int = int(out, 2)
- print(str(hex(out_int))[2:])
- def to_binary(value):
- int_val = int("0x"+value,16)
- binry = str(bin(int_val))[2:]
- return binry
- def add_zero(txt, length):
- length=length*4
- length_txt=len(txt)
- y=length-length_txt
- for x in range(y):
- txt="0"+txt
- return txt
- main()
- #
- #example-advertisement.py
- #
- #!/usr/bin/python
- from __future__ import print_function
- import argparse
- import dbus
- import dbus.exceptions
- import dbus.mainloop.glib
- import dbus.service
- import time
- import threading
- try:
- from gi.repository import GObject # python3
- except ImportError:
- import gobject as GObject # python2
- mainloop = None
- BLUEZ_SERVICE_NAME = 'org.bluez'
- LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
- DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
- DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
- LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
- class InvalidArgsException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
- class NotSupportedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.NotSupported'
- class NotPermittedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.NotPermitted'
- class InvalidValueLengthException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
- class FailedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.Failed'
- class Advertisement(dbus.service.Object):
- PATH_BASE = '/org/bluez/example/advertisement'
- def __init__(self, bus, index, advertising_type):
- f=open("mdata", "r")
- self.md="a0"+str(f.read())[:-1]
- #self.md="a0 c9 d0a0"
- print(self.md)
- self.path = self.PATH_BASE + str(index)
- self.bus = bus
- self.ad_type = advertising_type
- self.service_uuids = None
- self.manufacturer_data = None
- self.solicit_uuids = None
- self.service_data = None
- self.local_name = None
- self.include_tx_power = None
- self.data = None
- dbus.service.Object.__init__(self, bus, self.path)
- def calc_mandata(self):
- bArr = []
- md = self.md
- md = md.replace(" ", "")
- md = md+"0d"
- last_val=0
- for x in range(0, 10, 2):
- hex_as_int=int('0x'+md[x:x+2], 16)
- last_val=last_val+hex_as_int
- bArr.append(hex_as_int)
- checksum = str(hex(last_val))[-2:]
- checksum = int('0x'+checksum,16)
- bArr.append(checksum)
- return bArr
- def print_arr(self, bArr):
- out=""
- for x in bArr:
- strng = str(hex(x)).replace("0x", "")
- if len(strng) == 1:
- strng = "0"+strng
- out=out+" "+strng
- print(out)
- def get_properties(self):
- properties = dict()
- properties['Type'] = self.ad_type
- if self.service_uuids is not None:
- properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
- signature='s')
- if self.solicit_uuids is not None:
- properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
- signature='s')
- if self.manufacturer_data is not None:
- properties['ManufacturerData'] = dbus.Dictionary(
- self.manufacturer_data, signature='qv')
- if self.service_data is not None:
- properties['ServiceData'] = dbus.Dictionary(self.service_data,
- signature='sv')
- if self.local_name is not None:
- properties['LocalName'] = dbus.String(self.local_name)
- if self.include_tx_power is not None:
- properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power)
- if self.data is not None:
- properties['Data'] = dbus.Dictionary(
- self.data, signature='yv')
- return {LE_ADVERTISEMENT_IFACE: properties}
- def get_path(self):
- return dbus.ObjectPath(self.path)
- def add_service_uuid(self, uuid):
- if not self.service_uuids:
- self.service_uuids = []
- self.service_uuids.append(uuid)
- def add_solicit_uuid(self, uuid):
- if not self.solicit_uuids:
- self.solicit_uuids = []
- self.solicit_uuids.append(uuid)
- def add_manufacturer_data(self, manuf_code, data):
- if not self.manufacturer_data:
- self.manufacturer_data = dbus.Dictionary({}, signature='qv')
- self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')
- def add_local_name(self, name):
- if not self.local_name:
- self.local_name = ""
- self.local_name = dbus.String(name)
- def add_data(self, ad_type, data):
- if not self.data:
- self.data = dbus.Dictionary({}, signature='yv')
- self.data[ad_type] = dbus.Array(data, signature='y')
- @dbus.service.method(DBUS_PROP_IFACE,
- in_signature='s',
- out_signature='a{sv}')
- def GetAll(self, interface):
- print('GetAll')
- if interface != LE_ADVERTISEMENT_IFACE:
- raise InvalidArgsException()
- print('returning props')
- return self.get_properties()[LE_ADVERTISEMENT_IFACE]
- @dbus.service.method(LE_ADVERTISEMENT_IFACE,
- in_signature='',
- out_signature='')
- def Release(self):
- print('%s: Released!' % self.path)
- class TestAdvertisement(Advertisement):
- def __init__(self, bus, index):
- Advertisement.__init__(self, bus, index, 'peripheral')
- bArr = [0x3a, 0x05, 0xc7, 0xeb, 0x27, 0xb8]
- bArr2 = self.calc_mandata()
- self.print_arr(bArr2)
- bArr.extend(bArr2)
- self.print_arr(bArr)
- self.add_service_uuid('FFB0')
- self.add_manufacturer_data(0xa0ac, bArr)
- self.add_local_name('AAA006')
- self.include_tx_power = True
- def register_ad_cb():
- print('Advertisement registered')
- def register_ad_error_cb(error):
- print('Failed to register advertisement: ' + str(error))
- mainloop.quit()
- def find_adapter(bus):
- remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
- DBUS_OM_IFACE)
- objects = remote_om.GetManagedObjects()
- for o, props in objects.items():
- if LE_ADVERTISING_MANAGER_IFACE in props:
- return o
- return None
- def shutdown(timeout):
- print('Advertising for {} seconds...'.format(timeout))
- time.sleep(timeout)
- mainloop.quit()
- def main(timeout=0):
- global mainloop
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SystemBus()
- adapter = find_adapter(bus)
- if not adapter:
- print('LEAdvertisingManager1 interface not found')
- return
- adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
- "org.freedesktop.DBus.Properties")
- adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
- ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
- LE_ADVERTISING_MANAGER_IFACE)
- test_advertisement = TestAdvertisement(bus, 0)
- mainloop = GObject.MainLoop()
- ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {},
- reply_handler=register_ad_cb,
- error_handler=register_ad_error_cb)
- if timeout > 0:
- threading.Thread(target=shutdown, args=(timeout,)).start()
- else:
- print('Advertising forever...')
- mainloop.run() # blocks until mainloop.quit() is called
- ad_manager.UnregisterAdvertisement(test_advertisement)
- print('Advertisement unregistered')
- dbus.service.Object.remove_from_connection(test_advertisement)
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--timeout', default=0, type=int, help="advertise " +
- "for this many seconds then stop, 0=run forever " +
- "(default: 0)")
- args = parser.parse_args()
- main(args.timeout)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement