Advertisement
NBK_PlanB

PiHole ETL v2

Sep 21st, 2018
171
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.05 KB | None | 0 0
  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. import sqlite3
  5. from sqlite3 import Error
  6. import mysql.connector as mariadb
  7. import platform
  8. import time
  9.  
  10. exportLimit = 10000
  11. moreToDo = 1
  12.  
  13. def create_connection(db_file):
  14.     """ create a database connection to the SQLite database
  15.        specified by the db_file
  16.    :param db_file: database file
  17.    :return: Connection object or None
  18.    """
  19.     try:
  20.         conn = sqlite3.connect(db_file)
  21.         conn.execute("PRAGMA read_uncommitted = true;");
  22.         return conn
  23.     except Error as e:
  24.         print(e)
  25.     return None
  26.  
  27. def create_mariaConnection():
  28.     try:
  29.         mysqlconn = mariadb.connect(user='pi', password='sabaac', database='pihole', host='castaway-dock.local')
  30.         return mysqlconn
  31.     except Error as e:
  32.         print(e)
  33.     return None
  34.  
  35.  
  36. def select_all_tasks(mariaConn):
  37.     """
  38.    Query all rows in the tasks table
  39.    :param conn: the Connection object
  40.    :return:
  41.    """
  42.     lastTimeStamp = 0
  43.     lastId = 0
  44.     hostname = platform.node()
  45.     mCur = mariaConn.cursor(prepared=True)
  46.     mCur.execute("SELECT ifnull(value,0) FROM ftl WHERE hostname = ? AND id = 1",(hostname,))
  47.     ret = mCur.fetchone()
  48.     if ret!=None:
  49.         lastTimeStamp = ret[0]
  50.     mCur.execute("SELECT ifnull(value,0) FROM ftl WHERE hostname = ? AND id = 2", (hostname,))
  51.     ret = mCur.fetchone()
  52.     if ret!=None:
  53.         lastId = ret[0]
  54.     #print (lastId)
  55.     return lastTimeStamp,lastId
  56.  
  57.  
  58. def processData(conn, mariaConn, lastTimeStamp, lastId):
  59.     """
  60.    Get the current timestamp and ID
  61.    Get the data between last and current keys
  62.    """
  63.     global moreToDo
  64.     endId = 0
  65.     endTimeStamp = 0
  66.     sourceID = 0
  67.     timestamp = 0
  68.     ctr = 0
  69.     hostname = platform.node()
  70.  
  71.     mCur = mariaConn.cursor(prepared=True)
  72.     stmt = "INSERT INTO queries (hostname,sourceid,timestamp,type,status,domain,client,forward) VALUES (?,?,?,?,?,?,?,?)"
  73.  
  74.     cur = conn.cursor()
  75.     cur.execute("SELECT id, timestamp, type, status, domain, client, forward FROM queries where ID > ? LIMIT ?", (lastId,exportLimit))
  76.     rows = cur.fetchall()
  77.  
  78.     for row in rows:
  79.         ctr+=1
  80.         sourceID = row[0]
  81.         timestamp = row[1]
  82.         type = row[2]
  83.         status = row[3]
  84.         domain = row[4]
  85.         client = row[5]
  86.         forward = row[6]
  87.         #print(hostname,sourceID,timestamp,type,status,domain,client,forward)
  88.         mCur.execute(stmt,(hostname,sourceID,timestamp,type,status,domain,client,forward))
  89.  
  90.     mariaConn.commit()
  91.     endId = sourceID
  92.     endTimeStamp = timestamp
  93.     print ("Processed %s records, pausing for 5 seconds." % (ctr))
  94.     if (endId - lastId) < exportLimit:
  95.         moreToDo = 0
  96.  
  97.     return endTimeStamp, endId
  98.  
  99. def writeTimeStamps(mariaConn, endTimeStamp, endId):
  100.     """
  101.    Update the FTL Table rows with the ending timestamp and end id
  102.    """
  103.     if endId > 0:
  104.         hostname = platform.node()
  105.         mCur = mariaConn.cursor(prepared=True)
  106.         stmt = "INSERT INTO ftl (hostname, id, value) VALUES (?,?,?) ON DUPLICATE KEY UPDATE VALUE=?;"
  107.  
  108.         mCur.execute(stmt,(hostname,1,endTimeStamp,endTimeStamp))
  109.         mCur.execute(stmt,(hostname,2,endId,endId))
  110.         mariaConn.commit()
  111.  
  112.     return None
  113.  
  114. def main():
  115.     database = "/etc/pihole/pihole-FTL.db"
  116.     lastTimeStamp = 0
  117.     # create a database connection
  118.     conn = create_connection(database)
  119.     mariaConn = create_mariaConnection()
  120.     with conn:
  121.         print("Get last run TimeStamps.")
  122.         lastKeys = select_all_tasks(mariaConn)
  123.         print ("Last Processed ID = %s." % (lastKeys[1]))
  124.         while moreToDo > 0:
  125.             endKeys = processData(conn, mariaConn, lastKeys[0], lastKeys[1])
  126.             writeTimeStamps(mariaConn, endKeys[0],endKeys[1])
  127.             lastKeys = endKeys
  128.             print ("More Work? %s!" % (moreToDo))
  129.             if moreToDo > 0:
  130.                 time.sleep(5)
  131.     writeTimeStamps(mariaConn, endKeys[0],endKeys[1])
  132.     conn.close()
  133.     mariaConn.close()
  134.     print ("Complete")
  135. if __name__ == '__main__':
  136.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement