Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def read_mapr03r_table(schema, table):
- '''
- Given the schema and table names in mapr03r, returns a Pandas
- DataFrame. This function piggybacks on Mac's Swagger tool
- (location: http://hdp0019.infores.com:9999/__swagger__/
- source: http://csmrnd01.infores.com:3000/CSM/r-apis/src/master/hive-meta/hive-meta-api.R)
- The significant advantage over the function `run_pyhive_query`
- (which reads a PyHive query and returns a pandas DataFrame)
- is the reduction in time. Code works for both text and ORC formats.
- Limitation: Currently works only for mapr03r tables.
- Parameters:
- ----------
- schema: schema name in mapr03r
- table: table name in the aforementioned schema
- Returns:
- ----------
- table as a Pandas DataFrame object
- '''
- from os.path import join
- from os import listdir
- from pyarrow import orc
- import pandas as pd
- import requests
- api_endpoint = 'http://hdp0019.infores.com:9999/describe'
- request_url = "%s?table=%s&schema=%s"%(api_endpoint, table, schema)
- resp = requests.get(request_url).json()
- col_names = [item['name'] for item in resp[0]]
- table_dir = [item['value'] for item in resp[1] if item['property'] == 'Location'][0]
- sep = [item['value'] for item in resp[1] if item['property'] == 'field.delim']
- if len(sep):
- sep = sep[0]
- else:
- sep = '|' # because some files don't have the field.delim attribute
- input_format = [item['value'] for item in resp[1] if item['property'] == 'InputFormat'][0]
- chunks = []
- if input_format == 'org.apache.hadoop.mapred.TextInputFormat':
- for file in listdir(table_dir):
- chunk = pd.read_csv(join(table_dir, file), sep=sep, header=None, names=col_names)
- chunks.append(chunk)
- elif input_format == 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat':
- for file in listdir(table_dir):
- filepath = join(table_dir, file)
- with open(filepath, 'rb') as f:
- data = orc.ORCFile(f) # pyarrow.Table object
- chunk = data.read().to_pandas()
- chunks.append(chunk)
- else:
- print('Incorrect input format, aborting read.')
- return None
- return pd.concat(chunks)
- def run_pyhive_query(server, query, drop_schema_name=True):
- '''
- Runs a query in one of the hadoop clusters, mapr03r or mapre04p.
- Since this authenticates the connection using netrc, one
- needs to write a file ~/.netrc resembling the following
- ```machine hdp0019.infores.com login msabp password *****```
- Appropriate permissions (chmod 666) should be set to this file.
- Parameters:
- ----------
- server: `mapr03r` or `mapre04p`
- query: SQL query in three ticks, ''' '''
- split_dot: bool (default True), if True drops the schema name from
- DataFrame generated by query
- Returns:
- ----------
- pd.DataFrame object with database name dropped from columns.
- '''
- import pandas as pd
- from pyhive import hive
- from netrc import netrc
- if server == 'mapr03r':
- host = "hdp0019.infores.com"
- elif server == 'mapre04p':
- host = "lnx1170.ch3.prod.i.com"
- else:
- print("Server name not entered correctly!")
- return None
- info = netrc()
- user = info.authenticators(host)[0]
- passwd = info.authenticators(host)[2]
- auth = "LDAP"
- port = "10000"
- # establish connection
- conn = hive.Connection(host=host, port=port, username=user, password=passwd, auth=auth)
- try:
- df = pd.read_sql(query, conn)
- if drop_schema_name:
- if all([item.count('.')==1 for item in df.columns]):
- df.columns = [item.split(".")[1] for item in df.columns]
- else:
- print("Cannot drop schema name from query.")
- return df
- except TypeError:
- print("TypeError: 'NoneType' object is not iterable, since query does not return table!")
- return None
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement