Advertisement
eshafik

Untitled

Apr 8th, 2021
812
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.16 KB | None | 0 0
  1. import datetime
  2. import psycopg2
  3. from paho.mqtt import client as mqtt_client
  4.  
  5.  
  6. class MqttScript(object):
  7.     def __init__(self, broker, port, topic, client_id):
  8.         self.broker = broker
  9.         self.port = port
  10.         self.topic = topic
  11.         self.client_id = client_id
  12.  
  13.     @staticmethod
  14.     def on_connect(client, userdata, flags, rc):
  15.         if rc == 0:
  16.             print("Connected to MQTT Broker!")
  17.         else:
  18.             print("Failed to connect, return code %d\n", rc)
  19.  
  20.     def connect_mqtt(self):
  21.         client = mqtt_client.Client(self.client_id)
  22.         # client.username_pw_set(username, password)
  23.         client.on_connect = self.on_connect
  24.         # client.username_pw_set(username='username', password='password')
  25.         client.connect(self.broker, self.port)
  26.         print("--- return client")
  27.         return client
  28.  
  29.     def on_message(self, client, userdata, msg):
  30.         print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
  31.         self.send_data_to_db(msg)
  32.         return True
  33.  
  34.     @staticmethod
  35.     def send_data_to_db(msg):
  36.         db = DBScript(db_name='testdb', host='localhost', db_port='5432', user='shafik', password='shafik')
  37.         db.create_db()
  38.         topic = msg.topic
  39.         data = msg.payload.decode()
  40.         _, device_id, sensor_name = topic.split("/")
  41.         value = float(data)
  42.         db.insert_data(device_id, sensor_name, value, datetime.datetime.now())
  43.  
  44.     def subscribe(self, client: mqtt_client):
  45.         client.subscribe(self.topic)
  46.         client.on_message = self.on_message
  47.  
  48.  
  49. class DBScript(object):
  50.     def __init__(self, db_name, host, db_port, user, password):
  51.         self.conn = None
  52.         self.cursor = None
  53.         self._initial_setup(db_name, host, db_port, user, password)
  54.  
  55.     def _initial_setup(self, db_name, host, db_port, user, password):
  56.         conn = psycopg2.connect(database=db_name, user=user, password=password, host=host, port=db_port)
  57.         self.conn = conn
  58.         self.cursor = conn.cursor()
  59.         return True
  60.  
  61.     def create_db(self):
  62.         sql = '''CREATE TABLE IF NOT EXISTS sensor_data(
  63.            id serial primary key,
  64.            device_id integer ,
  65.            sensor_name varchar (45),
  66.            sensor_value integer ,
  67.            received_at timestamp
  68.            )'''
  69.         self.cursor.execute(sql)
  70.         self.conn.commit()
  71.         return True
  72.  
  73.     def insert_data(self, device_id, sensor_name, value, received_at):
  74.         try:
  75.             self.cursor.execute(
  76.                 "INSERT INTO SENSOR_DATA (device_id, sensor_name, sensor_value, received_at) VALUES (%s, %s, %s, %s)",
  77.                 (device_id, sensor_name, value, received_at))
  78.             self.conn.commit()
  79.             self.cursor_close()
  80.             print("insertion success")
  81.         except Exception as error:
  82.             print("Error insertion: {}".format(error))
  83.  
  84.     def cursor_close(self):
  85.         self.conn.close()
  86.  
  87.  
  88. if __name__ == '__main__':
  89.     mqtt = MqttScript(broker='172.105.37.284', port=8883, topic='sensors/+/+', client_id="server-script_01")
  90.     client = mqtt.connect_mqtt()
  91.     mqtt.subscribe(client)
  92.     client.loop_forever()
  93.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement