Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- # -*- coding: utf-8 -*-
- import sqlite3
- from sqlite3 import Error
- import mysql.connector as mariadb
- import platform
- import time
- import logging
- exportLimit = 5000
- moreToDo = 1
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- handler = logging.FileHandler('/var/log/pihole-ETL.log')
- handler.setLevel(logging.INFO)
- # create a logging format
- formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- def create_ftlconnection(db_file):
- """ create a database connection to the SQLite database
- specified by the db_file
- :param db_file: database file
- :return: Connection object or None
- """
- try:
- conn = sqlite3.connect(db_file)
- conn.execute("PRAGMA read_uncommitted = true;");
- return conn
- except Error as e:
- logger.error(e)
- return None
- def create_mariaConnection():
- try:
- mysqlconn = mariadb.connect(user='USERNAME', password='PASSWORD', database='pihole', host='SERVER')
- return mysqlconn
- except Error as e:
- logger.error(e)
- return None
- def select_all_tasks(mariaConn):
- """
- Query all rows in the tasks table
- :param conn: the Connection object
- :return:
- """
- lastTimeStamp = 0
- lastId = 0
- hostname = platform.node()
- mCur = mariaConn.cursor(prepared=True)
- mCur.execute("SELECT ifnull(value,0) FROM ftl WHERE hostname = ? AND id = 1",(hostname,))
- ret = mCur.fetchone()
- if ret!=None:
- lastTimeStamp = ret[0]
- mCur.execute("SELECT ifnull(value,0) FROM ftl WHERE hostname = ? AND id = 2", (hostname,))
- ret = mCur.fetchone()
- if ret!=None:
- lastId = ret[0]
- logger.debug("Keys from last run are ID=%s and TimeStamp=%s.",lastId, lastTimeStamp)
- return lastTimeStamp,lastId
- def processData(conn, mariaConn, lastTimeStamp, lastId):
- """
- Get the current timestamp and ID
- Get the data between last and current keys
- """
- global moreToDo
- endId = 0
- endTimeStamp = 0
- sourceID = 0
- timestamp = 0
- ctr = 0
- hostname = platform.node()
- mCur = mariaConn.cursor(prepared=True)
- stmt = "INSERT INTO queries (hostname,sourceid,timestamp,type,status,domain,client,forward) VALUES (?,?,?,?,?,?,?,?)"
- logger.info("Querying %s rows from Pi Hole database.",exportLimit)
- # The query injects the hostname into the results and so into the tuple list to be insested into the central DB
- cur = conn.cursor()
- cur.execute("SELECT ?, id, timestamp, type, status, domain, client, forward FROM queries WHERE id > ? ORDER BY id ASC LIMIT ?", (hostname,lastId,exportLimit))
- rows = cur.fetchall()
- ctr = len(rows)
- # Insert the entire Tuple in one call
- logger.info("Inserting %s rows into destination.",ctr)
- mCur.executemany(stmt,(rows))
- mariaConn.commit()
- # Get the values we need from the last row of the tuple
- if ctr > 0:
- endId = rows[ctr-1][1]
- endTimeStamp = rows[ctr-1][2]
- else:
- endId = lastId
- endTimeStamp = lastTimeStamp
- logger.info("Processed %s records.",ctr)
- logger.info("Ending Id pulled was %s",endId)
- if ctr < exportLimit:
- moreToDo = 0
- return endTimeStamp, endId
- def writeTimeStamps(mariaConn, endTimeStamp, endId):
- """
- Update the FTL Table rows with the ending timestamp and end id
- """
- if endId > 0:
- hostname = platform.node()
- mCur = mariaConn.cursor(prepared=True)
- stmt = "INSERT INTO ftl (hostname, id, value) VALUES (?,?,?) ON DUPLICATE KEY UPDATE VALUE=?;"
- mCur.execute(stmt,(hostname,1,endTimeStamp,endTimeStamp))
- mCur.execute(stmt,(hostname,2,endId,endId))
- logger.debug("New keys have been written to the database! ID=%s, TimeStamp=%s",endId,endTimeStamp)
- mariaConn.commit()
- return None
- def main():
- database = "/etc/pihole/pihole-FTL.db"
- lastTimeStamp = 0
- # create a database connection
- conn = create_ftlconnection(database)
- mariaConn = create_mariaConnection()
- with conn:
- logger.info("Starting... Get last run TimeStamps.")
- lastKeys = select_all_tasks(mariaConn)
- logger.info("Last Processed ID = %s.",lastKeys[1])
- while moreToDo > 0:
- endKeys = processData(conn, mariaConn, lastKeys[0], lastKeys[1])
- writeTimeStamps(mariaConn, endKeys[0],endKeys[1])
- lastKeys = endKeys
- if moreToDo > 0:
- logger.info("More rows to grab, pausing for 5 Seconds.")
- time.sleep(5)
- writeTimeStamps(mariaConn, endKeys[0],endKeys[1])
- mariaConn.close()
- conn.close()
- logger.info("Complete")
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement