Advertisement
Guest User

Untitled

a guest
Nov 12th, 2019
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.17 KB | None | 0 0
  1. from pyspark.sql.types import *
  2.  
  3. from rorc_csv.utils import *
  4.  
  5.  
  6. class PrepareDatio:
  7.  
  8. def __init__(self, spark, path):
  9. self.spark = spark
  10. self.path = path
  11.  
  12. def load_data(self):
  13. return self.spark.read.parquet(self.path)
  14.  
  15. def filters(self, df):
  16. return df \
  17. .filter(df.cutoff_date >= '2019-01-01') \
  18. .filter(df.operation_repository_id == '94') \
  19. .filter(df.entity_id != '9016')
  20.  
  21. def prepare_datio(self):
  22. df = self.load_data()
  23. df_filtered = self.filters(df)
  24. return df_filtered
  25.  
  26.  
  27. #######################################################################################################################
  28.  
  29. class PrepareDM:
  30. def __init__(self, spark, path):
  31. self.spark = spark
  32. self.path = path
  33.  
  34. def load_data(self):
  35. return self.spark.read.option('delimiter', ';') \
  36. .option('header', True) \
  37. .option("inferSchema", "true") \
  38. .csv(self.path).dropna(how='all')
  39.  
  40. def convert_namings(self, df):
  41. return df \
  42. .withColumn("cutoff_date", to_cut_off_date("Mes")) \
  43. .withColumn("entity_id", to_entity_id("Geograf�a EFAN")) \
  44. .withColumns("currency_id", to_currency_type("Geograf�a EFAN")) \
  45. .withColumnRenamed("Repositorio Origen de Rentabilidad",
  46. change_namings_col("Repositorio Origen de Rentabilidad")) \
  47. .withColumn('net_fee_new_lc_amount', df['MB NP Anualizado �H'].cast(DecimalType(26, 6))) \
  48. .withColumnRenamed("Operacion", change_namings_col("Operacion")) \
  49. .withColumnRenamed("Costes NP Anualizados �H", change_namings_col("Costes NP Anualizados �H")) \
  50. .withColumnRenamed("Impuestos NP Anualizado �H", change_namings_col("Impuestos NP Anualizado �H")) \
  51. .withColumnRenamed("BDI NP Anualizada �H", change_namings_col("BDI NP Anualizada �H")) \
  52. .withColumnRenamed("APR NP Riesgo Operacional SP �H", change_namings_col("APR NP Riesgo Operacional SP �H")) \
  53. .withColumnRenamed("% RORC NP �H", change_namings_col("% RORC NP �H")) \
  54. .drop("Mes", "Geograf�a EFAN", "MB NP Anualizado �H")
  55.  
  56. def filters(self, df):
  57. """
  58. Year 2019, operation repository 94, entity is not Mexico
  59.  
  60. :param df:
  61. :return:
  62. """
  63. return df \
  64. .filter(df.cutoff_date >= '2019-01-01') \
  65. .filter(df.operation_repository_id == '94') \
  66. .filter(df.entity_id != '9016')
  67.  
  68. def prepare_dm(self):
  69. df = self.load_data()
  70. df_namings = self.convert_namings(df)
  71. df_filtered = self.filters(df_namings)
  72.  
  73. return df_filtered
  74.  
  75.  
  76. ######################################################################################################################
  77.  
  78. class GetFlowType:
  79. def __init__(self, df):
  80. self.df = df
  81.  
  82. def prepare_np(self):
  83. return self.df.filter(self.df['net_fee_new_lc_amount'] != 0)
  84.  
  85. def prepare_stock(self):
  86. return self.df.filter(self.df['net_fee_stk_lc_amount'] != 0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement