Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime, date, time, timedelta
- import datetime
- import mysql.connector
- import pandas as p
- class Report(object):
- _body_text = None
- _filename = None
- _columns = None
- _query = None
- def __init__(self):
- pass
- def append_body_text(self, value):
- existing_text = str(self._body_text)
- self._body_text = existing_text + value
- @staticmethod
- def connect_to_database(user, password, database, host, port):
- connection = mysql.connector.connect(user=user, password=password, db=database, host=host, port=port)
- return connection
- @staticmethod
- def create_string_from_column(serial):
- created_list = serial.tolist()
- string = ', '.join(map(str, created_list))
- return string
- @staticmethod
- def generate_midnight():
- midnight = datetime.datetime.combine(date.today(), time.min)
- return midnight
- @staticmethod
- def generate_date_today():
- today = datetime.date.strftime(datetime.date.today(), "%Y-%m-%d")
- return today
- def generate_date_today_midnight(self):
- midnight = self.generate_midnight()
- midnight = midnight.strftime("%Y-%m-%d")
- return midnight
- def generate_date_today_midnight_timestamp(self):
- midnight = self.generate_midnight()
- beginning_of_unix_epoch = datetime.datetime(1970, 1, 1)
- midnight_timestamp = midnight - beginning_of_unix_epoch
- return midnight_timestamp
- def generate_date_yesterday_midnight(self):
- midnight = self.generate_midnight()
- yesterday = datetime.timedelta(days=1)
- yesterday_midnight = midnight - yesterday
- return yesterday_midnight
- def generate_date_yesterday_midnight_timestamp(self):
- yesterday_midnight = self.generate_date_yesterday_midnight()
- yesterday_midnight_timestamp = (yesterday_midnight - datetime.datetime(1970, 1, 1)).total_seconds()
- return yesterday_midnight_timestamp
- @staticmethod
- def generate_yesterday_literal_timestamp():
- yesterday_literal_timestamp = ((datetime.datetime.now() - datetime.timedelta(days=1)) -
- datetime.datetime(1970, 1, 1)).total_seconds()
- return yesterday_literal_timestamp
- def get_data(self, connection):
- cursor = connection.cursor()
- cursor.execute(self._query)
- results = cursor.fetchall()
- data = p.DataFrame(data=results, columns=self._columns)
- connection.close()
- return data
- @staticmethod
- def insert_variables_into_sql_statement(query, external_data):
- final_query = query % external_data
- return final_query
- def write_to_csv(self, data_frame, directory):
- data_frame.to_csv(directory + self._filename, sep=',', index=False)
- def generate_report(self, connection, directory, *args):
- data = self.get_data(connection)
- self.write_to_csv(data, directory)
- connection.close()
- from Report import Report
- from datetime import datetime
- class AccessLogsReport(Report):
- def __init__(self, database, timestamp):
- super(Report, self).__init__()
- self._database = database
- self._timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
- beginning_of_unix_epoch = datetime(1970, 1, 1)
- self._unix_timestamp = int((self._timestamp - beginning_of_unix_epoch).total_seconds())
- query_template = """
- SELECT
- *,
- FROM_UNIXTIME(Time)
- FROM {db}.AccessLog as a
- WHERE a.Time > {time};
- """
- self._query = query_template.format(db=self._database, time=self._unix_timestamp)
- self._columns = ['ID', 'Device_ID', 'Status_ID', 'Unix_Time', 'Number_Events', 'Time']
- today = self.generate_date_today()
- self._filename = 'access_logs_beginning_' + today + '.csv'
- self.append_body_text("* Access Logs Reportn")
- import mysql.connector
- import pandas as p
- class TestThing():
- def __init__(self):
- pass
- def eat_shoes(self):
- self._query = """
- SELECT
- *,
- FROM_UNIXTIME(Time)
- FROM server2.AccessLog as a
- WHERE a.Time > 1512507870;
- """
- self._columns = ['ID', 'Device_ID', 'Status_ID', 'Time', 'Number_Events', 'MachineTime']
- usr = 'user'
- pswd = 'password'
- dbase = 'Database2'
- hst = '127.0.0.1'
- prt = 13307
- connection = mysql.connector.connect(user=usr, password=pswd, db=dbase, host=hst, port=prt)
- cursor = connection.cursor()
- cursor.execute(self._query)
- results = cursor.fetchall()
- data = p.DataFrame(data=results, columns=self._columns)
- connection.close()
- return data
Add Comment
Please, Sign In to add comment