Advertisement
Guest User

Code example

a guest
May 18th, 2018
185
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.37 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. import psycopg2
  3. import yaml
  4. import re
  5. import pandas as pd
  6. import csv
  7. import sys
  8. import re
  9. from ast import literal_eval
  10. from psycopg2.extensions import AsIs
  11.  
  12.  
  13. def get_credentials(server, path):
  14.     """Reads credentials.
  15.  
  16.    :type server: str
  17.    :param server: options are 'features', 'wpmpg01', 'wpad02'.
  18.  
  19.    :type path: str
  20.    :param path: path to credentials.
  21.  
  22.    :rtype: dict
  23.    :return: dict with credentials if successfull, otherwise - exception text.
  24.    """
  25.  
  26.     exception_text = None
  27.     with open(path, 'rb') as handle:
  28.         config = yaml.safe_load(handle.read())
  29.     try:
  30.         dbase = config[server]['base']
  31.         host = config[server]['server']
  32.         user = config[server]['user']
  33.         password = config[server]['pass']
  34.         port = config[server]['port']
  35.     except:
  36.         exception_text = 'ERROR: Incorrect server name, try: \'features\' \
  37.        , \'wpmpg01\', \'wpad02\' or config is empty'
  38.         print(exception_text)
  39.         return exception_text
  40.  
  41.     return {'server': dbase, 'host': host, 'user': user,
  42.             'password': password, 'port': port}
  43.  
  44.  
  45. def open_connect_to_db(server, path):
  46.     """Connects to specific database.
  47.  
  48.    :type server: str
  49.    :param server: options are 'features', 'wpmpg01', 'wpad02'.
  50.  
  51.    :type path: str
  52.    :param path: path to credentials.
  53.  
  54.    :rtype: psycopg2.extensions.cursor
  55.    :return: psycopg2.extensions.cursor if successfull, \
  56.    otherwise - exception text.
  57.    """
  58.  
  59.     credentials = get_credentials(server, path)
  60.     try:
  61.         connection = psycopg2.connect(
  62.             "dbname=" + credentials['server'] +
  63.             " user=" + credentials['user'] +
  64.             " password=" + credentials['password'] +
  65.             " host=" + credentials['host'] +
  66.             " port=" + credentials['port'])
  67.         cur = connection.cursor()
  68.     except psycopg2.Error as e:
  69.         exception_text = e.diag.message_primary
  70.         print('ERROR: ' + exception_text)
  71.         return exception_text
  72.  
  73.     return cur
  74.  
  75.  
  76. def set_path_to_dblink(cur):
  77.     """Creates path to dblink.
  78.  
  79.    :type cur: psycopg2.extensions.cursor
  80.    :param cur: cursor to database to which dblink will be created.
  81.  
  82.    :rtype: nothing
  83.    :return: nothing if successfull, otherwise - exception text.
  84.    """
  85.  
  86.     exception_text = None
  87.     try:
  88.         cur.execute('SET search_path TO options_analytics, public;')
  89.     except psycopg2.Error as e:
  90.         exception_text = e.diag.message_primary
  91.         print('ERROR: ' + exception_text)
  92.         return exception_text
  93.  
  94.  
  95. def execute_query(cur, query):
  96.     """Executes query.
  97.  
  98.    :type cur: psycopg2.extensions.cursor
  99.    :param cur: cursor to database.
  100.  
  101.    :type query: str
  102.    :param cur: text of query to execute.
  103.  
  104.    :rtype: list
  105.    :return: list with result of query if successfull, \
  106.    otherwise - exception text.
  107.    """
  108.  
  109.     exception_text = None
  110.     try:
  111.         cur.execute(query)
  112.         cur.connection.commit()
  113.     except psycopg2.Error as e:
  114.         exception_text = e.diag.message_primary
  115.         print('ERROR: ' + exception_text)
  116.         return exception_text
  117.  
  118.     data = cur.fetchall()
  119.  
  120.     return data
  121.  
  122.  
  123. def execute_query_with_dblink(cur, query, server):
  124.     """Executes query with dblink.
  125.  
  126.    :type cur: psycopg2.extensions.cursor
  127.    :param cur: cursor to current database.
  128.  
  129.    :type query: str
  130.    :param cur: text of query to execute with connection string \
  131.    in dblink like this dbname=dbname host=host \
  132.    user=user password=password port=port.
  133.  
  134.    :type server: str
  135.    :param server: options are 'features', 'wpmpg01', \
  136.    'wpad02', server to connect to.
  137.  
  138.    :rtype: nothing
  139.    :return: nothing if successfull, otherwise - exception text.
  140.    """
  141.  
  142.     exception_text = None
  143.     credentials = get_credentials(server)
  144.  
  145.     try:
  146.         query = re.sub('dbname=dbname', 'dbname=' +
  147.                        credentials['server'], query)
  148.         query = re.sub('host=host', 'host=' + credentials['host'], query)
  149.         query = re.sub('user=user', 'user=' + credentials['user'], query)
  150.         query = re.sub('password=password', 'password=' +
  151.                        credentials['password'], query)
  152.         query = re.sub('port=port', 'port=' + credentials['port'], query)
  153.         cur.execute(query)
  154.         cur.connection.commit()
  155.     except psycopg2.Error as e:
  156.         exception_text = e.diag.message_primary
  157.         print('ERROR: ' + exception_text)
  158.         return exception_text
  159.  
  160.  
  161. def read_sql(path):
  162.     """Reads SQL query.
  163.  
  164.    :type path: str
  165.    :param path: path to SQL query to read.
  166.  
  167.    :rtype: nothing
  168.    :return: nothing if successfull, otherwise - standard error.
  169.    """
  170.  
  171.     with open(path, 'r') as sql_file:
  172.         query = sql_file.read()
  173.  
  174.     return query
  175.  
  176.  
  177. def write_to_stdout(path, data, columns_id_to_int):
  178.     """Writes to stdout.
  179.  
  180.    :type path: str
  181.    :param path: path to stdout to write to.
  182.  
  183.    :type data: list
  184.    :param data: data to write.
  185.    
  186.    :type columns_to_int: list
  187.    :param columns_to_int: list of columns to convert to integer.
  188.  
  189.    :rtype: nothing
  190.    :return: nothing if successfull, otherwise - standard error.
  191.    """
  192.  
  193.     data = pd.DataFrame(data)
  194.    
  195.     for i in columns_id_to_int:
  196.         data[data.columns[i]] = data[data.columns[i]].fillna(-1).astype(int)
  197.    
  198.     data.to_csv(path, header=False, index=False,
  199.                 quoting=csv.QUOTE_NONNUMERIC,
  200.                 float_format='%g', quotechar='\'', na_values=None)
  201.  
  202.  
  203. def read_from_stdout_and_insert(path, cur, table):
  204.     """Reads from stdout and inserts into destination table.
  205.  
  206.    :type cur: psycopg2.extensions.cursor
  207.    :param cur: cursor to database.
  208.  
  209.    :type path: str
  210.    :param path: path to stdout to read from.
  211.  
  212.    :type table: str
  213.    :param table: destination table name.
  214.  
  215.    :rtype: nothing
  216.    :return: nothing if successfull, otherwise - standard error.
  217.    """
  218.  
  219.     with open(path, 'r') as file_handler:
  220.         for line in file_handler:
  221.             line = re.sub("''", 'None', line)
  222.             line = eval(line)
  223.             try:
  224.                 cur.execute('INSERT INTO %s VALUES %s', (AsIs(table), line))
  225.             except psycopg2.Error as e:
  226.                 exception_text = e.diag.message_primary
  227.                 print('ERROR: ' + exception_text)
  228.  
  229.             cur.connection.commit()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement