Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from umqtt.simple import MQTTClient
- from machine import Pin, I2C
- from bme280 import *
- import ujson
- import time
- class DataCollector:
- def __init__(self, topic, server, scl, sda):
- self.topic = topic
- self.server = server
- self.scl = scl
- self.sda = sda
- self.__i2c = I2C(scl=Pin(self.scl), sda=Pin(self.sda))
- self.__address = self.__i2c.scan()[0]
- self.bme = BME280(i2c=self.__i2c, address=self.__address)
- self.led = self.initialize_led()
- def read_sensor(self):
- raw_data = dict(zip(["temperature", "pressure", "humidity"], self.bme.raw_values))
- return raw_data
- def publish_data(self, client_name, interval=10):
- c = MQTTClient(client_name, self.server)
- c.connect()
- while True:
- data = self.read_sensor()
- data["client"] = client_name
- c.publish(self.topic, ujson.dumps(data))
- self.blink()
- time.sleep(interval)
- def initialize_led(self):
- led = Pin(2 ,Pin.OUT)
- for i in range(3):
- led.on()
- time.sleep(0.1)
- led.off()
- time.sleep(0.1)
- led.on()
- return led
- def blink(self):
- self.led.off()
- time.sleep(0.1)
- self.led.on()
- if __name__ == "__main__":
- data_collector = DataCollector("environment", "rpi-z0", 5, 4)
- data_collector.publish_data(client_name="esp8266", interval=1)
Add Comment
Please, Sign In to add comment