Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import ubinascii
- import time
- import sys
- from machine import Timer
- import socket
- from network import Bluetooth
- from network import LoRa
- from machine import SD
- import os
- import math
- import gc
- import pycom
- from network import WLAN
- ### Parameter ###
- SCAN_DURATION_SEC = 30 # WARNING: affects lora send interval; Default at 300 seconds
- ENABLE_LORA = True
- EXT_ANTENNA = True # Set True when using an external antenna (Bluetooth)
- ENABLE_SCANNING = True # Enable Bluetooth Scanning for the duration of SCAN_DURATION, then sends lora messae to Backend
- sd_enabled = True # SD Card has to be formatted to either fat16 or fat32
- dev_eui = ubinascii.unhexlify('xxxxxxxxxxxxx')
- app_eui = ubinascii.unhexlify('xxxxxxxxxxxxx')
- app_key = ubinascii.unhexlify('xxxxxxxxxxxxxxxxxxxxxxxxxx')
- CHAR_SIZE = 8
- LORA_PAYLOAD_MAX_SIZE = 52 # limitation of lora radion transmission under given lora parameters
- PAYLOAD_MAX_SIZE = 49 # multiple of 7 that is closest to lora payload max size
- UUID_PAYLOAD_LOWER_BOUND = 20
- UUID_PAYLOAD_UPPER_BOUND = 52
- MAJOR_LOWER_BOUND = 52
- MAJOR_UPPER_BOUND = 56
- MINOR_LOWER_BOUND = 56
- MINOR_UPPER_BOUND = 60
- # Hilfsfunktionen
- def pad_hex_string(s):
- if(len(s)%2 != 0):
- s = b'0' + s
- return s
- def standard_variance(values):
- sum = 0
- average = avg(values)
- for x in values:
- expression = math.pow(x - average, 2)
- sum += expression
- return math.sqrt(sum / (len(values)-1))
- def avg(values):
- sum = 0
- for e in values:
- sum += e
- return sum / len(values)
- def s16(value):
- return -(value & 0x80) | (value & 0x7f)
- def tohex(val, nbits):
- return hex((val + (1 << nbits)) % (1 << nbits))[2:]
- def isInTransmitBuffer(mac):
- x = 0
- for i in transmit_buffer:
- if i["mac"] == mac:
- return x
- x = x+1
- return -1
- def rssi_for_mac_existing(mac):
- for i in transmit_buffer:
- if i.rssi_values:
- return True
- return False
- def blink(color_hex1, color_hex2, ntimes, sleep_between_sec):
- """
- Makes the Lopy4 blink as desired, used for visual representation of internal states in case no console output can be accessed
- """
- for i in range(ntimes):
- pycom.rgbled(color_hex1)
- time.sleep(sleep_between_sec)
- pycom.rgbled(color_hex2)
- time.sleep(sleep_between_sec)
- def segment_payload(payload, packet_max_size=PAYLOAD_MAX_SIZE):
- """
- Takes a payload and divides it into smaller payloads as to not exceed the maxmimum payload size limit of lorawan
- """
- fitted_payloads = []
- offset = 0
- length = len(payload)
- while offset*packet_max_size < length:
- fitted_payloads.append(payload[offset*packet_max_size : offset*packet_max_size+packet_max_size])
- offset += 1
- return fitted_payloads
- def scanToBuffer(dur_sec): # alarm als param rausgenommen!
- pycom.rgbled(0x0000CC)
- print("Scanning to buffer...")
- #bluetooth.init(antenna=Bluetooth.EXT_ANT)
- bluetooth.start_scan(dur_sec)
- if sd_enabled:
- f = open('/sd/log.txt', 'a')
- f.write("\n" + str(time.time()) + " Free Memory: " + str(gc.mem_free() / 1024) + " Kilobyte --- Starting BLE Scanning...")
- f.close()
- while bluetooth.isscanning():
- adv = bluetooth.get_adv()
- if adv:
- adata = str(ubinascii.hexlify(adv.data))
- uuid = adata[UUID_PAYLOAD_LOWER_BOUND:UUID_PAYLOAD_UPPER_BOUND]
- major = adata[MAJOR_LOWER_BOUND:MAJOR_UPPER_BOUND]
- minor = adata[MINOR_LOWER_BOUND:MINOR_UPPER_BOUND]
- ble_mac = ubinascii.hexlify(adv.mac)
- #if ubinascii.hexlify(adv.mac) == b'cc78ab1f7e94' or adv.mac == "cc78ab1f7e94" or adv.mac == b'cc/78/ab/1f/7e/94':
- # for Axaet PC062 Beacons for Minew i3 & Minew e9 Beacons (small black, and rectangular white)
- #if uuid == 'fda50693a4e24fb1afcfc6eb07647825' or uuid == 'e2c56db5dffb48d2b060d0f5a71096e0':
- if uuid == 'fda50693a4e24fb1afcfc6eb0764affe' or uuid == 'e2c56db5dffb48d2b060d0f5a710affe':
- # Check if beacons already exists in transmit buffer
- isExisting = False
- for obj in transmit_buffer:
- if ble_mac == obj["mac"]:
- # beacon exists, just add new rssi datapoint
- print(".", end="")
- obj["rssi_list"].append(adv.rssi)
- obj["num"] += 1
- isExisting = True
- break
- # beacon does not exist, create new data object
- if isExisting is False:
- rssi_list = []
- rssi_list.append(adv.rssi)
- data_obj = {"mac": ble_mac, "rssi_list": rssi_list, "major": major, "minor": minor, "num": 0}
- transmit_buffer.append(data_obj)
- print("New Beacon found, added to transmit buffer")
- blink(0x000000, 0x0000FF, 1, 0.03)
- pycom.rgbled(0x000000)
- if sd_enabled:
- f = open('/sd/log.txt', 'a')
- f.write("\n" + str(time.time()) + " Finished Scanning! --- Free Memory:" + str(gc.mem_free() / 1024) + " Kilobyte")
- f.close()
- def print_rssi_list_and_send():
- print("\n*********** RESULTS OF CYCLE **************")
- lora_send_buffer = ""
- for index in range(len(transmit_buffer)):
- print("\n")
- try:
- obj = transmit_buffer[index]
- except:
- print(" error popping transmit buffer")
- print(" ",obj["mac"], "Number of measurements in total:", obj["num"])
- # Sortieren
- rssi_list = obj["rssi_list"]
- rssi_list.sort()
- print(" after sorting:", rssi_list)
- # Min, Max und Median
- min_value = rssi_list[0]
- median = rssi_list[round(len(rssi_list)/2) -1]
- max_value = rssi_list[len(rssi_list) -1]
- # 1. Quantil
- pos = round(len(rssi_list) * 0.75) -1
- if pos < 0: pos = 0 # validate index to not index at -1 if len is 1
- q1 = rssi_list[pos]
- # 3. Quantil
- pos = round(len(rssi_list) * 0.25) -1
- if pos < 0: pos = 0
- q3 = rssi_list[pos]
- # Interquantile range
- iqr = q1 - q3
- # Standardvarianz & Durchschnitt
- std = standard_variance(rssi_list)
- average = avg(rssi_list)
- print(" min:", min_value, "q3:", q3, "median:", median, "q1:", q1, "max:", max_value)
- lora_send_buffer += obj["mac"].decode("utf-8")
- lora_send_buffer += str(tohex(max_value, CHAR_SIZE))
- transmit_buffer[index]["rssi_list"].clear()
- transmit_buffer[index]["num"] = 0
- length = len(lora_send_buffer)
- if length == 0: # Send 00 as payload, because empty payloads are not forwarded by lorIOT
- lora_send_buffer += "00"
- length += 2
- #print("last 3 values in hex string:", lora_send_buffer[(length-7):(length-1)])
- print("\n Lora Send Buffer:", lora_send_buffer, "Length:", length)
- lora_send_bytes = ubinascii.unhexlify(lora_send_buffer)
- if ENABLE_LORA:
- fitted_payloads = segment_payload(lora_send_bytes)
- for payload in fitted_payloads:
- print("Sending:", payload, end=" ")
- print("of Length:", len(payload), "Bytes")
- while(True): # while True and break emulates a do-while loop
- try:
- if sd_enabled:
- f = open('/sd/log.txt', 'a')
- f.write("\n" + str(time.time()) + " About to send :" + str(ubinascii.hexlify(payload)))
- f.close()
- print("lorawan has joined status:", lora.has_joined())
- s.send(payload)
- print("Success!")
- if sd_enabled:
- f = open('/sd/log.txt', 'a')
- f.write("\n" + str(time.time()) + " Sent lorawan packet:" + str(ubinascii.hexlify(payload)))
- f.close()
- blink(0x26734d, 0x000000, 5, 0.1) # pinewood green for success
- break
- except OSError as err:
- print("OS error: {0}".format(err))
- blink(0xFF0000, 0x000000, 30, 0.1)
- if sd_enabled:
- f = open('/sd/log.txt', 'a')
- f.write("\n" + str(time.time()) + " Sending failed: {0}".format(err))
- f.close()
- break
- else:
- print("Not sending, lora function is disabled")
- def lora_tx_cb(lora):
- events = lora.events()
- print("Lora Tx callback invoked")
- if events & LoRa.TX_FAILED_EVENT:
- print('--- Tx-Callback: Transmit Failed Mask has been set')
- else:
- print(" --- Tx-Callback: Transmit Failed Mask has NOT been set!")
- # Disable Heartbeat to enable custom LED usage
- pycom.heartbeat(False)
- # Initialize SD Card for logging
- sd = None # foward declaration to enable global use while also defining in the following sub-scope
- f = None
- if sd_enabled:
- try: # try mounting
- sd = SD()
- os.mount(sd, '/sd')
- f = open('/sd/log.txt', 'a')
- f.write("\n" + str(time.time()) + ' --- Device Booted ---')
- f.close()
- print("written initial sequence to SD card")
- blink
- except OSError as err:
- print('Error mounting SD Card: ', err)
- sd_enabled = False
- # Disable Wifi, it is enabled by default
- wlan = WLAN(mode=WLAN.STA)
- wlan.deinit()
- # LoRa Initialization:
- lora = LoRa(mode=LoRa.LORAWAN, region=LoRa.EU868, adr=True)
- lora.init(mode=LoRa.LORAWAN, power_mode=LoRa.ALWAYS_ON)
- lora.callback(trigger=LoRa.TX_PACKET_EVENT, handler=lora_tx_cb)
- s = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
- s.setsockopt(socket.SOL_LORA, socket.SO_DR, 0)
- s.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, False)
- s.setblocking(True) # only important when receiving and for waiting the 2 received windows after sending
- # Joining the Network
- if ENABLE_LORA is True:
- if not lora.has_joined():
- lora.join(activation=LoRa.OTAA, auth=(dev_eui, app_eui, app_key), timeout=0, dr=0)
- trying_counter = 0
- while not lora.has_joined():
- time.sleep(3)
- trying_counter += 3
- print('Not yet joined, have been trying for', trying_counter, 'seconds...')
- blink(0x0000FF, 0x000000, 1, 0.2)
- print("Connected to LoRaWAN Network!")
- if sd_enabled:
- f = open('/sd/log.txt', 'a')
- f.write("\n" + str(time.time()) + ' joined lorawan network')
- f.close
- blink(0x00FF00, 0x000000, 10, 0.1)
- # Bluetooth Initialization
- print("Initializing Bluetooth...", end="")
- transmit_buffer = []
- bluetooth = Bluetooth()
- if EXT_ANTENNA is True:
- bluetooth = Bluetooth(antenna=Bluetooth.EXT_ANT)
- bluetooth.init(antenna=Bluetooth.EXT_ANT)
- else:
- bluetooth.init()
- print("Done!")
- # Printing available RAM, just for debug purposes
- print("--- Free Mem before main loop:", gc.mem_free() / 1024, " Kilobytes ---")
- # Main loop
- running = True
- while running:
- if ENABLE_SCANNING:
- scanToBuffer(SCAN_DURATION_SEC)
- print_rssi_list_and_send()
- print("Remaining Memory: ", gc.mem_free() / 1024, "Kilobytes of RAM")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement