eshafik

Untitled

Apr 8th, 2021
627
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import datetime
  2. import psycopg2
  3. from paho.mqtt import client as mqtt_client
  4.  
  5.  
  6. class MqttScript(object):
  7.     def __init__(self, broker, port, topic, client_id):
  8.         self.broker = broker
  9.         self.port = port
  10.         self.topic = topic
  11.         self.client_id = client_id
  12.  
  13.     @staticmethod
  14.     def on_connect(client, userdata, flags, rc):
  15.         if rc == 0:
  16.             print("Connected to MQTT Broker!")
  17.         else:
  18.             print("Failed to connect, return code %d\n", rc)
  19.  
  20.     def connect_mqtt(self):
  21.         client = mqtt_client.Client(self.client_id)
  22.         # client.username_pw_set(username, password)
  23.         client.on_connect = self.on_connect
  24.         # client.username_pw_set(username='username', password='password')
  25.         client.connect(self.broker, self.port)
  26.         print("--- return client")
  27.         return client
  28.  
  29.     def on_message(self, client, userdata, msg):
  30.         print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
  31.         self.send_data_to_db(msg)
  32.         return True
  33.  
  34.     @staticmethod
  35.     def send_data_to_db(msg):
  36.         db = DBScript(db_name='testdb', host='localhost', db_port='5432', user='shafik', password='shafik')
  37.         db.create_db()
  38.         topic = msg.topic
  39.         data = msg.payload.decode()
  40.         _, device_id, sensor_name = topic.split("/")
  41.         value = float(data)
  42.         db.insert_data(device_id, sensor_name, value, datetime.datetime.now())
  43.  
  44.     def subscribe(self, client: mqtt_client):
  45.         client.subscribe(self.topic)
  46.         client.on_message = self.on_message
  47.  
  48.  
  49. class DBScript(object):
  50.     def __init__(self, db_name, host, db_port, user, password):
  51.         self.conn = None
  52.         self.cursor = None
  53.         self._initial_setup(db_name, host, db_port, user, password)
  54.  
  55.     def _initial_setup(self, db_name, host, db_port, user, password):
  56.         conn = psycopg2.connect(database=db_name, user=user, password=password, host=host, port=db_port)
  57.         self.conn = conn
  58.         self.cursor = conn.cursor()
  59.         return True
  60.  
  61.     def create_db(self):
  62.         sql = '''CREATE TABLE IF NOT EXISTS sensor_data(
  63.            id serial primary key,
  64.            device_id integer ,
  65.            sensor_name varchar (45),
  66.            sensor_value integer ,
  67.            received_at timestamp
  68.            )'''
  69.         self.cursor.execute(sql)
  70.         self.conn.commit()
  71.         return True
  72.  
  73.     def insert_data(self, device_id, sensor_name, value, received_at):
  74.         try:
  75.             self.cursor.execute(
  76.                 "INSERT INTO SENSOR_DATA (device_id, sensor_name, sensor_value, received_at) VALUES (%s, %s, %s, %s)",
  77.                 (device_id, sensor_name, value, received_at))
  78.             self.conn.commit()
  79.             self.cursor_close()
  80.             print("insertion success")
  81.         except Exception as error:
  82.             print("Error insertion: {}".format(error))
  83.  
  84.     def cursor_close(self):
  85.         self.conn.close()
  86.  
  87.  
  88. if __name__ == '__main__':
  89.     mqtt = MqttScript(broker='172.105.37.284', port=8883, topic='sensors/+/+', client_id="server-script_01")
  90.     client = mqtt.connect_mqtt()
  91.     mqtt.subscribe(client)
  92.     client.loop_forever()
  93.  
RAW Paste Data

Adblocker detected! Please consider disabling it...

We've detected AdBlock Plus or some other adblocking software preventing Pastebin.com from fully loading.

We don't have any obnoxious sound, or popup ads, we actively block these annoying types of ads!

Please add Pastebin.com to your ad blocker whitelist or disable your adblocking software.

×