Advertisement
NBK_PlanB

Pi Hole ETL v3

Sep 22nd, 2018
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.79 KB | None | 0 0
  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. import sqlite3
  5. from sqlite3 import Error
  6. import mysql.connector as mariadb
  7. import platform
  8. import time
  9. import logging
  10.  
  11. exportLimit = 5000
  12. moreToDo = 1
  13. logger = logging.getLogger(__name__)
  14. logger.setLevel(logging.INFO)
  15. handler = logging.FileHandler('/var/log/pihole-ETL.log')
  16. handler.setLevel(logging.INFO)
  17.  
  18. # create a logging format
  19. formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  20. handler.setFormatter(formatter)
  21.  
  22. logger.addHandler(handler)
  23.  
  24. def create_ftlconnection(db_file):
  25.     """ create a database connection to the SQLite database
  26.        specified by the db_file
  27.    :param db_file: database file
  28.    :return: Connection object or None
  29.    """
  30.     try:
  31.         conn = sqlite3.connect(db_file)
  32.         conn.execute("PRAGMA read_uncommitted = true;");
  33.         return conn
  34.     except Error as e:
  35.         logger.error(e)
  36.     return None
  37.  
  38. def create_mariaConnection():
  39.     try:
  40.         mysqlconn = mariadb.connect(user='USERNAME', password='PASSWORD', database='pihole', host='SERVER')
  41.         return mysqlconn
  42.     except Error as e:
  43.         logger.error(e)
  44.     return None
  45.  
  46.  
  47. def select_all_tasks(mariaConn):
  48.     """
  49.    Query all rows in the tasks table
  50.    :param conn: the Connection object
  51.    :return:
  52.    """
  53.     lastTimeStamp = 0
  54.     lastId = 0
  55.     hostname = platform.node()
  56.     mCur = mariaConn.cursor(prepared=True)
  57.     mCur.execute("SELECT ifnull(value,0) FROM ftl WHERE hostname = ? AND id = 1",(hostname,))
  58.     ret = mCur.fetchone()
  59.     if ret!=None:
  60.         lastTimeStamp = ret[0]
  61.     mCur.execute("SELECT ifnull(value,0) FROM ftl WHERE hostname = ? AND id = 2", (hostname,))
  62.     ret = mCur.fetchone()
  63.     if ret!=None:
  64.         lastId = ret[0]
  65.     logger.debug("Keys from last run are ID=%s and TimeStamp=%s.",lastId, lastTimeStamp)
  66.     return lastTimeStamp,lastId
  67.  
  68.  
  69. def processData(conn, mariaConn, lastTimeStamp, lastId):
  70.     """
  71.    Get the current timestamp and ID
  72.    Get the data between last and current keys
  73.    """
  74.     global moreToDo
  75.     endId = 0
  76.     endTimeStamp = 0
  77.     sourceID = 0
  78.     timestamp = 0
  79.     ctr = 0
  80.     hostname = platform.node()
  81.  
  82.     mCur = mariaConn.cursor(prepared=True)
  83.     stmt = "INSERT INTO queries (hostname,sourceid,timestamp,type,status,domain,client,forward) VALUES (?,?,?,?,?,?,?,?)"
  84.  
  85.     logger.info("Querying %s rows from Pi Hole database.",exportLimit)
  86.  
  87.     # The query injects the hostname into the results and so into the tuple list to be insested into the central DB
  88.     cur = conn.cursor()
  89.     cur.execute("SELECT ?, id, timestamp, type, status, domain, client, forward FROM queries WHERE id > ? ORDER BY id ASC LIMIT ?", (hostname,lastId,exportLimit))
  90.     rows = cur.fetchall()
  91.     ctr = len(rows)
  92.     # Insert the entire Tuple in one call
  93.     logger.info("Inserting %s rows into destination.",ctr)
  94.     mCur.executemany(stmt,(rows))
  95.     mariaConn.commit()
  96.  
  97.     # Get the values we need from the last row of the tuple
  98.     if ctr > 0:
  99.         endId = rows[ctr-1][1]
  100.         endTimeStamp = rows[ctr-1][2]
  101.     else:
  102.         endId = lastId
  103.         endTimeStamp = lastTimeStamp
  104.     logger.info("Processed %s records.",ctr)
  105.     logger.info("Ending Id pulled was %s",endId)
  106.     if ctr < exportLimit:
  107.         moreToDo = 0
  108.  
  109.     return endTimeStamp, endId
  110.  
  111. def writeTimeStamps(mariaConn, endTimeStamp, endId):
  112.     """
  113.    Update the FTL Table rows with the ending timestamp and end id
  114.    """
  115.     if endId > 0:
  116.         hostname = platform.node()
  117.         mCur = mariaConn.cursor(prepared=True)
  118.         stmt = "INSERT INTO ftl (hostname, id, value) VALUES (?,?,?) ON DUPLICATE KEY UPDATE VALUE=?;"
  119.  
  120.         mCur.execute(stmt,(hostname,1,endTimeStamp,endTimeStamp))
  121.         mCur.execute(stmt,(hostname,2,endId,endId))
  122.         logger.debug("New keys have been written to the database!  ID=%s, TimeStamp=%s",endId,endTimeStamp)
  123.         mariaConn.commit()
  124.  
  125.     return None
  126.  
  127. def main():
  128.     database = "/etc/pihole/pihole-FTL.db"
  129.     lastTimeStamp = 0
  130.     # create a database connection
  131.     conn = create_ftlconnection(database)
  132.     mariaConn = create_mariaConnection()
  133.     with conn:
  134.         logger.info("Starting...  Get last run TimeStamps.")
  135.         lastKeys = select_all_tasks(mariaConn)
  136.         logger.info("Last Processed ID = %s.",lastKeys[1])
  137.         while moreToDo > 0:
  138.             endKeys = processData(conn, mariaConn, lastKeys[0], lastKeys[1])
  139.             writeTimeStamps(mariaConn, endKeys[0],endKeys[1])
  140.             lastKeys = endKeys
  141.             if moreToDo > 0:
  142.                 logger.info("More rows to grab, pausing for 5 Seconds.")
  143.                 time.sleep(5)
  144.         writeTimeStamps(mariaConn, endKeys[0],endKeys[1])
  145.     mariaConn.close()
  146.     conn.close()
  147.     logger.info("Complete")
  148. if __name__ == '__main__':
  149.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement