Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark.sql.types import *
- from rorc_csv.utils import *
- class PrepareDatio:
- def __init__(self, spark, path):
- self.spark = spark
- self.path = path
- def load_data(self):
- return self.spark.read.parquet(self.path)
- def filters(self, df):
- return df \
- .filter(df.cutoff_date >= '2019-01-01') \
- .filter(df.operation_repository_id == '94') \
- .filter(df.entity_id != '9016')
- def prepare_datio(self):
- df = self.load_data()
- df_filtered = self.filters(df)
- return df_filtered
- #######################################################################################################################
- class PrepareDM:
- def __init__(self, spark, path):
- self.spark = spark
- self.path = path
- def load_data(self):
- return self.spark.read.option('delimiter', ';') \
- .option('header', True) \
- .option("inferSchema", "true") \
- .csv(self.path).dropna(how='all')
- def convert_namings(self, df):
- return df \
- .withColumn("cutoff_date", to_cut_off_date("Mes")) \
- .withColumn("entity_id", to_entity_id("Geograf�a EFAN")) \
- .withColumns("currency_id", to_currency_type("Geograf�a EFAN")) \
- .withColumnRenamed("Repositorio Origen de Rentabilidad",
- change_namings_col("Repositorio Origen de Rentabilidad")) \
- .withColumn('net_fee_new_lc_amount', df['MB NP Anualizado �H'].cast(DecimalType(26, 6))) \
- .withColumnRenamed("Operacion", change_namings_col("Operacion")) \
- .withColumnRenamed("Costes NP Anualizados �H", change_namings_col("Costes NP Anualizados �H")) \
- .withColumnRenamed("Impuestos NP Anualizado �H", change_namings_col("Impuestos NP Anualizado �H")) \
- .withColumnRenamed("BDI NP Anualizada �H", change_namings_col("BDI NP Anualizada �H")) \
- .withColumnRenamed("APR NP Riesgo Operacional SP �H", change_namings_col("APR NP Riesgo Operacional SP �H")) \
- .withColumnRenamed("% RORC NP �H", change_namings_col("% RORC NP �H")) \
- .drop("Mes", "Geograf�a EFAN", "MB NP Anualizado �H")
- def filters(self, df):
- """
- Year 2019, operation repository 94, entity is not Mexico
- :param df:
- :return:
- """
- return df \
- .filter(df.cutoff_date >= '2019-01-01') \
- .filter(df.operation_repository_id == '94') \
- .filter(df.entity_id != '9016')
- def prepare_dm(self):
- df = self.load_data()
- df_namings = self.convert_namings(df)
- df_filtered = self.filters(df_namings)
- return df_filtered
- ######################################################################################################################
- class GetFlowType:
- def __init__(self, df):
- self.df = df
- def prepare_np(self):
- return self.df.filter(self.df['net_fee_new_lc_amount'] != 0)
- def prepare_stock(self):
- return self.df.filter(self.df['net_fee_stk_lc_amount'] != 0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement