Advertisement
Guest User

Untitled

a guest
Apr 24th, 2019
117
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.03 KB | None | 0 0
  1. def read_mapr03r_table(schema, table):
  2.     '''
  3.    Given the schema and table names in mapr03r, returns a Pandas
  4.    DataFrame. This function piggybacks on Mac's Swagger tool
  5.    (location: http://hdp0019.infores.com:9999/__swagger__/
  6.    source: http://csmrnd01.infores.com:3000/CSM/r-apis/src/master/hive-meta/hive-meta-api.R)
  7.    The significant advantage over the function `run_pyhive_query`
  8.    (which reads a PyHive query and returns a pandas DataFrame)
  9.    is the reduction in time. Code works for both text and ORC formats.
  10.    Limitation: Currently works only for mapr03r tables.
  11.    
  12.    Parameters:
  13.    ----------
  14.    schema: schema name in mapr03r
  15.    table: table name in the aforementioned schema
  16.    
  17.    Returns:
  18.    ----------
  19.    table as a Pandas DataFrame object
  20.    '''
  21.     from os.path import join
  22.     from os import listdir
  23.     from pyarrow import orc
  24.     import pandas as pd
  25.     import requests
  26.  
  27.     api_endpoint = 'http://hdp0019.infores.com:9999/describe'
  28.     request_url = "%s?table=%s&schema=%s"%(api_endpoint, table, schema)
  29.     resp = requests.get(request_url).json()
  30.    
  31.     col_names = [item['name'] for item in resp[0]]
  32.     table_dir = [item['value'] for item in resp[1] if item['property'] == 'Location'][0]
  33.     sep = [item['value'] for item in resp[1] if item['property'] == 'field.delim']
  34.     if len(sep):
  35.         sep = sep[0]
  36.     else:
  37.         sep = '|' # because some files don't have the field.delim attribute
  38.     input_format = [item['value'] for item in resp[1] if item['property'] == 'InputFormat'][0]
  39.     chunks = []
  40.    
  41.     if input_format == 'org.apache.hadoop.mapred.TextInputFormat':
  42.         for file in listdir(table_dir):
  43.             chunk = pd.read_csv(join(table_dir, file), sep=sep, header=None, names=col_names)
  44.             chunks.append(chunk)
  45.     elif input_format == 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat':
  46.         for file in listdir(table_dir):
  47.             filepath = join(table_dir, file)
  48.             with open(filepath, 'rb') as f:
  49.                 data = orc.ORCFile(f) # pyarrow.Table object
  50.                 chunk = data.read().to_pandas()
  51.             chunks.append(chunk)
  52.     else:
  53.         print('Incorrect input format, aborting read.')
  54.         return None
  55.     return pd.concat(chunks)
  56.  
  57. def run_pyhive_query(server, query, drop_schema_name=True):
  58.     '''
  59.    Runs a query in one of the hadoop clusters, mapr03r or mapre04p.
  60.    Since this authenticates the connection using netrc, one
  61.    needs to write a file ~/.netrc resembling the following
  62.    ```machine hdp0019.infores.com login msabp password *****```
  63.    Appropriate permissions (chmod 666) should be set to this file.
  64.    
  65.    Parameters:
  66.    ----------
  67.    server: `mapr03r` or `mapre04p`
  68.    query: SQL query in three ticks, ''' '''
  69.    split_dot: bool (default True), if True drops the schema name from
  70.            DataFrame generated by query
  71.    
  72.    Returns:
  73.    ----------
  74.    pd.DataFrame object with database name dropped from columns.
  75.    '''
  76.  
  77.     import pandas as pd
  78.     from pyhive import hive
  79.     from netrc import netrc
  80.    
  81.     if server == 'mapr03r':
  82.         host = "hdp0019.infores.com"
  83.     elif server == 'mapre04p':
  84.         host = "lnx1170.ch3.prod.i.com"
  85.     else:
  86.         print("Server name not entered correctly!")
  87.         return None
  88.    
  89.     info = netrc()
  90.     user = info.authenticators(host)[0]
  91.     passwd = info.authenticators(host)[2]
  92.     auth = "LDAP"
  93.     port = "10000"
  94.  
  95.     # establish connection
  96.     conn = hive.Connection(host=host, port=port, username=user, password=passwd, auth=auth)
  97.     try:
  98.         df = pd.read_sql(query, conn)
  99.         if drop_schema_name:
  100.             if all([item.count('.')==1 for item in df.columns]):
  101.                 df.columns = [item.split(".")[1] for item in df.columns]
  102.             else:
  103.                 print("Cannot drop schema name from query.")
  104.         return df
  105.     except TypeError:
  106.         print("TypeError: 'NoneType' object is not iterable, since query does not return table!")
  107.         return None
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement