Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- import psycopg2
- from paho.mqtt import client as mqtt_client
- class MqttScript(object):
- def __init__(self, broker, port, topic, client_id):
- self.broker = broker
- self.port = port
- self.topic = topic
- self.client_id = client_id
- @staticmethod
- def on_connect(client, userdata, flags, rc):
- if rc == 0:
- print("Connected to MQTT Broker!")
- else:
- print("Failed to connect, return code %d\n", rc)
- def connect_mqtt(self):
- client = mqtt_client.Client(self.client_id)
- # client.username_pw_set(username, password)
- client.on_connect = self.on_connect
- # client.username_pw_set(username='username', password='password')
- client.connect(self.broker, self.port)
- print("--- return client")
- return client
- def on_message(self, client, userdata, msg):
- print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
- self.send_data_to_db(msg)
- return True
- @staticmethod
- def send_data_to_db(msg):
- db = DBScript(db_name='testdb', host='localhost', db_port='5432', user='shafik', password='shafik')
- db.create_db()
- topic = msg.topic
- data = msg.payload.decode()
- _, device_id, sensor_name = topic.split("/")
- value = float(data)
- db.insert_data(device_id, sensor_name, value, datetime.datetime.now())
- def subscribe(self, client: mqtt_client):
- client.subscribe(self.topic)
- client.on_message = self.on_message
- class DBScript(object):
- def __init__(self, db_name, host, db_port, user, password):
- self.conn = None
- self.cursor = None
- self._initial_setup(db_name, host, db_port, user, password)
- def _initial_setup(self, db_name, host, db_port, user, password):
- conn = psycopg2.connect(database=db_name, user=user, password=password, host=host, port=db_port)
- self.conn = conn
- self.cursor = conn.cursor()
- return True
- def create_db(self):
- sql = '''CREATE TABLE IF NOT EXISTS sensor_data(
- id serial primary key,
- device_id integer ,
- sensor_name varchar (45),
- sensor_value integer ,
- received_at timestamp
- )'''
- self.cursor.execute(sql)
- self.conn.commit()
- return True
- def insert_data(self, device_id, sensor_name, value, received_at):
- try:
- self.cursor.execute(
- "INSERT INTO SENSOR_DATA (device_id, sensor_name, sensor_value, received_at) VALUES (%s, %s, %s, %s)",
- (device_id, sensor_name, value, received_at))
- self.conn.commit()
- self.cursor_close()
- print("insertion success")
- except Exception as error:
- print("Error insertion: {}".format(error))
- def cursor_close(self):
- self.conn.close()
- if __name__ == '__main__':
- mqtt = MqttScript(broker='172.105.37.284', port=8883, topic='sensors/+/+', client_id="server-script_01")
- client = mqtt.connect_mqtt()
- mqtt.subscribe(client)
- client.loop_forever()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement