Advertisement
Guest User

Untitled

a guest
Mar 9th, 2019
102
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.52 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. #
  4. # thanks to @sebageek for bootstrapping
  5.  
  6. import argparse
  7. import logging
  8. import json
  9. import paho.mqtt.client as mqtt
  10. from influxdb import InfluxDBClient
  11.  
  12. __VERSION__ = '1.0'
  13.  
  14. class Collector:
  15. DB_NAME = 'iot'
  16.  
  17. def __init__(self, db_host, mqtt_host, db_password="influxdbpassword", mqtt_password="mqttpassword"):
  18. self._log = logging.getLogger("mqtt")
  19.  
  20. self._db_host = db_host
  21. self._db_password = db_password
  22. self._server = mqtt_host
  23. self._user = "all"
  24.  
  25. self._influxdb = None
  26. self._connectInfluxDB()
  27.  
  28. self._mqtt = mqtt.Client()
  29. self._mqtt.username_pw_set(self._user, mqtt_password)
  30. self._mqtt.on_connect = self._onConnect
  31. self._mqtt.on_message = self._onMessage
  32.  
  33.  
  34. def _connectInfluxDB(self):
  35. self._log.info("Connecting to influxdb with pw")
  36. self._influxdb = InfluxDBClient(host=self._db_host, port=8086, username="admin", password=self._db_password, database=self.DB_NAME)
  37. self._influxdb.create_database(self.DB_NAME)
  38.  
  39. def run(self):
  40. self._mqtt.connect(self._server)
  41. self._mqtt.loop_forever()
  42.  
  43. def _onConnect(self, client, userdata, flags, rc):
  44. self._log.info("Connected to mqtt server, subscribe to all topics ('#')")
  45. self._mqtt.subscribe("#")
  46. # self._mqtt.subscribe("topic/foo/bar")
  47.  
  48. def _onMessage(self, client, userdata, msg):
  49. try:
  50. data = self.get_mapping(msg.topic)(msg)
  51. if not data:
  52. return
  53.  
  54. data_point = {
  55. "measurement": msg.topic,
  56. "tags": {
  57. 'type': msg.topic.rsplit('/', 1)[-1]
  58. },
  59. "fields": {
  60. 'value': data
  61. }
  62. }
  63.  
  64.  
  65. if self._influxdb and data_point:
  66. self._influxdb.write_points([data_point])
  67. self._log.info("Logged value for %s" % data_point)
  68. except Exception as e:
  69. self._log.exception(e)
  70.  
  71. @staticmethod
  72. def convert_raw(msg):
  73. return float(str(msg.payload, encoding='utf-8'))
  74.  
  75. @staticmethod
  76. def convert_bool(msg):
  77. return bool(str(msg.payload, encoding='utf-8'))
  78.  
  79. def get_mapping(self, topic):
  80. # set which topics need conversion here, to filter/edit data before db-write
  81. topic_name = topic.split('/')[-1]
  82. if topic_name in ['temperature', 'humidity', 'amperage', 'wattage']:
  83. return self.convert_raw
  84. if topic_name == "toilet_in_use":
  85. return self.convert_raw
  86.  
  87. return lambda f: None
  88.  
  89. def _parser():
  90. parser = argparse.ArgumentParser()
  91. parser.add_argument("--version", action="version", version="%(prog)s " + __VERSION__)
  92. parser.add_argument("db_host", type=str)
  93. parser.add_argument("mqtt_host", type=str)
  94. parser.add_argument("--db_password", type=str)
  95. parser.add_argument("--mqtt_password", type=str)
  96.  
  97. return parser
  98.  
  99.  
  100. if __name__ == '__main__':
  101. logging.basicConfig(level=logging.INFO)
  102.  
  103. parser = _parser()
  104. args = parser.parse_args()
  105.  
  106. try:
  107. with open('/run/secrets/broker_password') as f:
  108. mqtt_password = f.read().strip()
  109. except IOError:
  110. mqtt_password=args.mqtt_password
  111.  
  112. try:
  113. with open('/run/secrets/database_password') as f:
  114. db_password = f.read().strip()
  115. except IOError:
  116. db_password=args.db_password
  117.  
  118. srv = Collector(args.db_host, args.mqtt_host, db_password=db_password, mqtt_password=mqtt_password)
  119. srv.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement