Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- #
- # thanks to @sebageek for bootstrapping
- import argparse
- import logging
- import json
- import paho.mqtt.client as mqtt
- from influxdb import InfluxDBClient
- __VERSION__ = '1.0'
- class Collector:
- DB_NAME = 'iot'
- def __init__(self, db_host, mqtt_host, db_password="influxdbpassword", mqtt_password="mqttpassword"):
- self._log = logging.getLogger("mqtt")
- self._db_host = db_host
- self._db_password = db_password
- self._server = mqtt_host
- self._user = "all"
- self._influxdb = None
- self._connectInfluxDB()
- self._mqtt = mqtt.Client()
- self._mqtt.username_pw_set(self._user, mqtt_password)
- self._mqtt.on_connect = self._onConnect
- self._mqtt.on_message = self._onMessage
- def _connectInfluxDB(self):
- self._log.info("Connecting to influxdb with pw")
- self._influxdb = InfluxDBClient(host=self._db_host, port=8086, username="admin", password=self._db_password, database=self.DB_NAME)
- self._influxdb.create_database(self.DB_NAME)
- def run(self):
- self._mqtt.connect(self._server)
- self._mqtt.loop_forever()
- def _onConnect(self, client, userdata, flags, rc):
- self._log.info("Connected to mqtt server, subscribe to all topics ('#')")
- self._mqtt.subscribe("#")
- # self._mqtt.subscribe("topic/foo/bar")
- def _onMessage(self, client, userdata, msg):
- try:
- data = self.get_mapping(msg.topic)(msg)
- if not data:
- return
- data_point = {
- "measurement": msg.topic,
- "tags": {
- 'type': msg.topic.rsplit('/', 1)[-1]
- },
- "fields": {
- 'value': data
- }
- }
- if self._influxdb and data_point:
- self._influxdb.write_points([data_point])
- self._log.info("Logged value for %s" % data_point)
- except Exception as e:
- self._log.exception(e)
- @staticmethod
- def convert_raw(msg):
- return float(str(msg.payload, encoding='utf-8'))
- @staticmethod
- def convert_bool(msg):
- return bool(str(msg.payload, encoding='utf-8'))
- def get_mapping(self, topic):
- # set which topics need conversion here, to filter/edit data before db-write
- topic_name = topic.split('/')[-1]
- if topic_name in ['temperature', 'humidity', 'amperage', 'wattage']:
- return self.convert_raw
- if topic_name == "toilet_in_use":
- return self.convert_raw
- return lambda f: None
- def _parser():
- parser = argparse.ArgumentParser()
- parser.add_argument("--version", action="version", version="%(prog)s " + __VERSION__)
- parser.add_argument("db_host", type=str)
- parser.add_argument("mqtt_host", type=str)
- parser.add_argument("--db_password", type=str)
- parser.add_argument("--mqtt_password", type=str)
- return parser
- if __name__ == '__main__':
- logging.basicConfig(level=logging.INFO)
- parser = _parser()
- args = parser.parse_args()
- try:
- with open('/run/secrets/broker_password') as f:
- mqtt_password = f.read().strip()
- except IOError:
- mqtt_password=args.mqtt_password
- try:
- with open('/run/secrets/database_password') as f:
- db_password = f.read().strip()
- except IOError:
- db_password=args.db_password
- srv = Collector(args.db_host, args.mqtt_host, db_password=db_password, mqtt_password=mqtt_password)
- srv.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement