Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from sklearn.model_selection import train_test_split
- import xgboost as xgb
- import cudf
- from cudf.dataframe import DataFrame
- from collections import OrderedDict
- import gc
- from glob import glob
- import os
- import pyblazing
- import pandas as pd
- import time
- from chronometer import Chronometer
- from pyblazing import FileSystemType, SchemaFrom, DriverType
- def register_hdfs():
- print('*** Register a HDFS File System ***')
- fs_status = pyblazing.register_file_system(
- authority="myLocalHdfs",
- type=FileSystemType.HDFS,
- root="/",
- params={
- "host": "127.0.0.1",
- "port": 54310,
- "user": "hadoop",
- "driverType": DriverType.LIBHDFS3,
- "kerberosTicket": ""
- }
- )
- print(fs_status)
- def deregister_hdfs():
- fs_status = pyblazing.deregister_file_system(authority="myLocalHdfs")
- print(fs_status)
- def register_posix():
- import os
- dir_path = os.path.dirname(os.path.realpath(__file__))
- print('*** Register a POSIX File System ***')
- fs_status = pyblazing.register_file_system(
- authority="mortgage",
- type=FileSystemType.POSIX,
- root=dir_path
- )
- print(fs_status)
- def deregister_posix():
- fs_status = pyblazing.deregister_file_system(authority="mortgage")
- print(fs_status)
- from libgdf_cffi import ffi, libgdf
- def get_dtype_values(dtypes):
- values = []
- def gdf_type(type_name):
- dicc = {
- 'str': libgdf.GDF_STRING,
- 'date': libgdf.GDF_DATE64,
- 'date64': libgdf.GDF_DATE64,
- 'date32': libgdf.GDF_DATE32,
- 'timestamp': libgdf.GDF_TIMESTAMP,
- 'category': libgdf.GDF_CATEGORY,
- 'float': libgdf.GDF_FLOAT32,
- 'double': libgdf.GDF_FLOAT64,
- 'float32': libgdf.GDF_FLOAT32,
- 'float64': libgdf.GDF_FLOAT64,
- 'short': libgdf.GDF_INT16,
- 'long': libgdf.GDF_INT64,
- 'int': libgdf.GDF_INT32,
- 'int32': libgdf.GDF_INT32,
- 'int64': libgdf.GDF_INT64,
- }
- if dicc.get(type_name):
- return dicc[type_name]
- return libgdf.GDF_INT64
- for key in dtypes:
- values.append( gdf_type(dtypes[key]))
- print('>>>> dtyps for', dtypes.values())
- print(values)
- return values
- def get_type_schema(path):
- format = path.split('.')[-1]
- if format == 'parquet':
- return SchemaFrom.ParquetFile
- elif format == 'csv' or format == 'psv' or format.startswith("txt"):
- return SchemaFrom.CsvFile
- def open_perf_table(table_ref):
- for key in table_ref.keys():
- sql = 'select * from main.%(table_name)s' % {"table_name": key.table_name}
- return pyblazing.run_query(sql, table_ref)
- def run_gpu_workflow(quarter=1, year=2000, perf_file="", **kwargs):
- import time
- load_start_time = time.time()
- names = gpu_load_names()
- acq_gdf = gpu_load_acquisition_csv(acquisition_path=acq_data_path + "/Acquisition_"
- + str(year) + "Q" + str(quarter) + ".txt")
- gdf = gpu_load_performance_csv(perf_file)
- load_end_time = time.time()
- etl_start_time = time.time()
- acq_gdf_results = merge_names(acq_gdf, names)
- everdf_results = create_ever_features(gdf)
- delinq_merge_results = create_delinq_features(gdf)
- new_everdf_results = join_ever_delinq_features(everdf_results.columns, delinq_merge_results.columns)
- joined_df_results = create_joined_df(gdf.columns, new_everdf_results.columns)
- del (new_everdf_results)
- testdf_results = create_12_mon_features_union(joined_df_results.columns)
- testdf = testdf_results.columns
- new_joined_df_results = combine_joined_12_mon(joined_df_results.columns, testdf)
- del (testdf)
- del (joined_df_results)
- perf_df_results = final_performance_delinquency(gdf.columns, new_joined_df_results.columns)
- del (gdf)
- del (new_joined_df_results)
- final_gdf_results = join_perf_acq_gdfs(perf_df_results.columns, acq_gdf_results.columns)
- del (perf_df_results)
- del (acq_gdf_results)
- final_gdf = last_mile_cleaning(final_gdf_results.columns)
- etl_end_time = time.time()
- return [final_gdf, (load_end_time - load_start_time), (etl_end_time - etl_start_time)]
Add Comment
Please, Sign In to add comment