Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- sys.path.append("./lib")
- import binascii
- import struct
- import pycom
- import socket
- import time
- import _thread
- import ujson as json
- from machine import Pin
- from machine import Timer
- from network import LoRa
- from network import WLAN
- from pysense import Pysense
- from LIS2HH12 import LIS2HH12
- from LTR329ALS01 import LTR329ALS01
- from SI7006A20 import SI7006A20
- from MPL3115A2 import MPL3115A2
- from mqtt import MQTTClient
- # Colors
- off = 0x000000
- red = 0xff0000
- green = 0x00ff00
- blue = 0x0000ff
- yellow = 0xFFFF00
- white = 0xFFFFFF
- mode = 'wifi'
- threadstate = 0
- pycom.heartbeat(False)
- print('Hello!')
- pycom.rgbled(red)
- time.sleep(3)
- # Set network keys
- app_eui = binascii.unhexlify('AABBCCDDEEFF1122')
- app_key = binascii.unhexlify('AABBCCDDEEFF11223344556677889900')
- # Init pysense board
- py = Pysense()
- mp = MPL3115A2(py, mode=MPL3115A2.PRESSURE)
- si = SI7006A20(py)
- lt = LTR329ALS01(py)
- li = LIS2HH12(py)
- # Initialize LoRaWAN radio and Wifi
- lora = LoRa(mode=LoRa.LORAWAN)
- class SensorValues():
- def __init__(self):
- self.binary = None
- self.JSON = None
- def get_JSON(self):
- return self.JSON
- def get_binary(self):
- return self.binary
- def set_JSON(self, json_string):
- self.JSON = json_string
- def set_binary(self, bin):
- self.binary = bin
- sensorValues = SensorValues()
- class StoppableThread:
- def __init__(self):
- self._running = True
- self._stopped = False
- self._started = False
- def stop(self):
- print('Requested thread to stop')
- self._running = False
- self._started = False
- def is_started(self):
- return self._started
- def is_stopped(self):
- return self._stopped
- def run(self, *args, **kwargs):
- self.setup(args, kwargs)
- self._started = True
- while self._running == True:
- self.loop(args, kwargs)
- time.sleep_ms(1)
- self.teardown()
- self._stopped = True
- _thread.exit()
- def loop(self, *args, **kwargs):
- raise NotImplementedError
- def teardown(self):
- return 1
- def setup(self, *args, **kwargs):
- return 1
- class WifiNetworkThread(StoppableThread):
- def setup(self, *args, **kwargs):
- self.wlan = WLAN(mode=WLAN.STA, antenna=WLAN.INT_ANT)
- print("LoPy WiFi MAC: " + str(binascii.hexlify(self.wlan.mac())))
- print('Connecting WiFi lopy with pw lopyconnect')
- self.wlan.connect('lopy', auth=(WLAN.WPA2, 'lopyconnect'))
- def teardown(self):
- if self.wlan.isconnected():
- self.wlan.disconnect()
- self.wlan.deinit()
- def loop(self, *args, **kwargs):
- if self.wlan.isconnected() == False:
- print('.', end='')
- pycom.rgbled(yellow)
- time.sleep(1)
- else:
- pycom.rgbled(off)
- try:
- client = MQTTClient("lopy", "m21.cloudmqtt.com",user="lopyconnect", password="lopyconnect", port=26883, ssl=True)
- client.connect()
- client.publish(topic="lopy", msg="ON")
- client.disconnect()
- except:
- print("Unexpected error:", sys.exc_info()[0])
- time.sleep(5)
- class LoRaNetworkThread(StoppableThread):
- def setup(self, *args, **kwargs):
- self.socket = None
- print('Joining LoRa Network')
- lora.join(activation=LoRa.OTAA, auth=(app_eui, app_key), timeout=0)
- print('LoPy LoRa MAC (TTN Devive EUI): ', end='')
- print(binascii.hexlify(lora.mac()))
- def teardown(self):
- if self.socket != None:
- self.socket.close()
- self.socket = None
- def loop(self, *args, **kwargs):
- if not lora.has_joined():
- print('.', end='')
- pycom.rgbled(yellow)
- time.sleep(1)
- elif self.socket == None: # initially configure our socket
- self.socket = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
- self.socket.setsockopt(socket.SOL_LORA, socket.SO_DR, 5)
- self.socket.setblocking(True)
- lora.power_mode(LoRa.TX_ONLY) # auto sleep if not transmitting
- else:
- c = self.socket.send(sensorValues.get_binary())
- print('Sent ', str(c), ' bytes via LoRa')
- time.sleep(15)
- current_network_thread = WifiNetworkThread()
- _thread.start_new_thread(current_network_thread.run, ())
- def switch_mode(arg):
- global mode
- global current_network_thread
- print(' ')
- print(' ')
- if mode == 'wifi':
- print('Switching to LoRa')
- else:
- print('Switch to WiFi')
- new_led_color = white if mode == 'wifi' else blue
- led_color = blue if mode == 'wifi' else white
- pycom.rgbled(led_color)
- current_network_thread.stop()
- while current_network_thread.is_stopped() == False:
- pycom.rgbled(off)
- time.sleep(0.2)
- pycom.rgbled(led_color)
- time.sleep(0.2)
- current_network_thread = LoRaNetworkThread() if mode == 'wifi' else WifiNetworkThread()
- mode = 'lora' if mode == 'wifi' else 'wifi'
- pycom.rgbled(new_led_color)
- _thread.start_new_thread(current_network_thread.run, ())
- while current_network_thread.is_started() == False:
- pycom.rgbled(off)
- time.sleep(0.2)
- pycom.rgbled(new_led_color)
- time.sleep(0.2)
- pycom.rgbled(off)
- def update_measures(*args, **kwargs):
- # print('Updating sensor values...')
- pys_voltage = py.read_battery_voltage()
- pys_acceleration_all = li.acceleration()
- pys_light_all = lt.light()
- struct_params = [
- int((mp.temperature() - 5) * 100), # int, 2Byte - h/short
- float(mp.pressure()), # float, 4Byte - f/float
- int(si.humidity() * 100), # int, 2Byte - h/short
- int(pys_light_all[0]), # light[0]: 2Byte -H/unsigned short
- int(pys_light_all[1]), # light[1]: 2Byte -H/unsigned shor
- int(pys_acceleration_all[0] * 100), # float, 4Byte - i/int
- int(pys_acceleration_all[1] * 100), # float, 4Byte - i/int
- int(pys_acceleration_all[2] * 100), # float, 4Byte - i/int
- # float (conv to int), 2Byte - h/short
- int(li.roll() * 100),
- # float (conv to int), 2Byte - h/short
- int(li.pitch() * 100),
- # float (conv to int), 2Byte - h/short
- int(li.yaw() * 100),
- # float (conv to int), 2Byte - h/short
- int(py.read_battery_voltage() * 100)
- ]
- bVal = struct.pack('>hfhHHiiihhhh', *struct_params)
- hVal = binascii.hexlify(bVal)
- sensorValues.set_binary(bVal)
- json_string = json.dumps({
- 'temperature': struct_params[0] / 100,
- 'pressure': struct_params[1],
- 'humidity': struct_params[2] / 100,
- 'light1': struct_params[3],
- 'light2': struct_params[4],
- 'acceleration': [struct_params[5] / 100, struct_params[6] / 100, struct_params[7] / 100],
- 'roll': struct_params[8] / 100,
- 'pitch': struct_params[9] / 100,
- 'yaw': struct_params[10] / 100,
- 'battery': struct_params[11] / 100
- })
- sensorValues.set_JSON(json_string)
- # init mode switch button
- switch_mode_button = Pin('P14', mode=Pin.IN)
- switch_mode_button.callback(Pin.IRQ_FALLING, handler=switch_mode)
- Timer.Alarm(update_measures, 5, periodic=True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement