Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import psycopg2
- import yaml
- import re
- import pandas as pd
- import csv
- import sys
- import re
- from ast import literal_eval
- from psycopg2.extensions import AsIs
- def get_credentials(server, path):
- """Reads credentials.
- :type server: str
- :param server: options are 'features', 'wpmpg01', 'wpad02'.
- :type path: str
- :param path: path to credentials.
- :rtype: dict
- :return: dict with credentials if successfull, otherwise - exception text.
- """
- exception_text = None
- with open(path, 'rb') as handle:
- config = yaml.safe_load(handle.read())
- try:
- dbase = config[server]['base']
- host = config[server]['server']
- user = config[server]['user']
- password = config[server]['pass']
- port = config[server]['port']
- except:
- exception_text = 'ERROR: Incorrect server name, try: \'features\' \
- , \'wpmpg01\', \'wpad02\' or config is empty'
- print(exception_text)
- return exception_text
- return {'server': dbase, 'host': host, 'user': user,
- 'password': password, 'port': port}
- def open_connect_to_db(server, path):
- """Connects to specific database.
- :type server: str
- :param server: options are 'features', 'wpmpg01', 'wpad02'.
- :type path: str
- :param path: path to credentials.
- :rtype: psycopg2.extensions.cursor
- :return: psycopg2.extensions.cursor if successfull, \
- otherwise - exception text.
- """
- credentials = get_credentials(server, path)
- try:
- connection = psycopg2.connect(
- "dbname=" + credentials['server'] +
- " user=" + credentials['user'] +
- " password=" + credentials['password'] +
- " host=" + credentials['host'] +
- " port=" + credentials['port'])
- cur = connection.cursor()
- except psycopg2.Error as e:
- exception_text = e.diag.message_primary
- print('ERROR: ' + exception_text)
- return exception_text
- return cur
- def set_path_to_dblink(cur):
- """Creates path to dblink.
- :type cur: psycopg2.extensions.cursor
- :param cur: cursor to database to which dblink will be created.
- :rtype: nothing
- :return: nothing if successfull, otherwise - exception text.
- """
- exception_text = None
- try:
- cur.execute('SET search_path TO options_analytics, public;')
- except psycopg2.Error as e:
- exception_text = e.diag.message_primary
- print('ERROR: ' + exception_text)
- return exception_text
- def execute_query(cur, query):
- """Executes query.
- :type cur: psycopg2.extensions.cursor
- :param cur: cursor to database.
- :type query: str
- :param cur: text of query to execute.
- :rtype: list
- :return: list with result of query if successfull, \
- otherwise - exception text.
- """
- exception_text = None
- try:
- cur.execute(query)
- cur.connection.commit()
- except psycopg2.Error as e:
- exception_text = e.diag.message_primary
- print('ERROR: ' + exception_text)
- return exception_text
- data = cur.fetchall()
- return data
- def execute_query_with_dblink(cur, query, server):
- """Executes query with dblink.
- :type cur: psycopg2.extensions.cursor
- :param cur: cursor to current database.
- :type query: str
- :param cur: text of query to execute with connection string \
- in dblink like this dbname=dbname host=host \
- user=user password=password port=port.
- :type server: str
- :param server: options are 'features', 'wpmpg01', \
- 'wpad02', server to connect to.
- :rtype: nothing
- :return: nothing if successfull, otherwise - exception text.
- """
- exception_text = None
- credentials = get_credentials(server)
- try:
- query = re.sub('dbname=dbname', 'dbname=' +
- credentials['server'], query)
- query = re.sub('host=host', 'host=' + credentials['host'], query)
- query = re.sub('user=user', 'user=' + credentials['user'], query)
- query = re.sub('password=password', 'password=' +
- credentials['password'], query)
- query = re.sub('port=port', 'port=' + credentials['port'], query)
- cur.execute(query)
- cur.connection.commit()
- except psycopg2.Error as e:
- exception_text = e.diag.message_primary
- print('ERROR: ' + exception_text)
- return exception_text
- def read_sql(path):
- """Reads SQL query.
- :type path: str
- :param path: path to SQL query to read.
- :rtype: nothing
- :return: nothing if successfull, otherwise - standard error.
- """
- with open(path, 'r') as sql_file:
- query = sql_file.read()
- return query
- def write_to_stdout(path, data, columns_id_to_int):
- """Writes to stdout.
- :type path: str
- :param path: path to stdout to write to.
- :type data: list
- :param data: data to write.
- :type columns_to_int: list
- :param columns_to_int: list of columns to convert to integer.
- :rtype: nothing
- :return: nothing if successfull, otherwise - standard error.
- """
- data = pd.DataFrame(data)
- for i in columns_id_to_int:
- data[data.columns[i]] = data[data.columns[i]].fillna(-1).astype(int)
- data.to_csv(path, header=False, index=False,
- quoting=csv.QUOTE_NONNUMERIC,
- float_format='%g', quotechar='\'', na_values=None)
- def read_from_stdout_and_insert(path, cur, table):
- """Reads from stdout and inserts into destination table.
- :type cur: psycopg2.extensions.cursor
- :param cur: cursor to database.
- :type path: str
- :param path: path to stdout to read from.
- :type table: str
- :param table: destination table name.
- :rtype: nothing
- :return: nothing if successfull, otherwise - standard error.
- """
- with open(path, 'r') as file_handler:
- for line in file_handler:
- line = re.sub("''", 'None', line)
- line = eval(line)
- try:
- cur.execute('INSERT INTO %s VALUES %s', (AsIs(table), line))
- except psycopg2.Error as e:
- exception_text = e.diag.message_primary
- print('ERROR: ' + exception_text)
- cur.connection.commit()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement