Advertisement
Guest User

Untitled

a guest
Oct 16th, 2019
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.08 KB | None | 0 0
  1. from pyspark.sql import Row
  2. import pyspark.sql.functions as F
  3.  
  4. def append_payer_spend(context_ts, collected_col):
  5. if len(collected_col) == 1:
  6. if collected_col[0] == Row(None,None):
  7. return Row('is_payer', 'spend')(0.0, 0.0)
  8.  
  9. collected_col = sorted(collected_col, key=lambda x: x.txTimestamp, reverse=False)
  10. is_payer = 0.0
  11. total_spend = 0.0
  12. for entry in collected_col:
  13. diff = (entry.txTimestamp - context_ts).days
  14. if diff >= 0 and diff < 7:
  15. is_payer = 1.0
  16. total_spend += entry.receiptUsdAmount
  17. return Row('is_payer', 'spend')(is_payer, total_spend)
  18.  
  19. # struct to store multiple values
  20. schema_added = StructType([
  21. StructField("is_payer", FloatType(), False),
  22. StructField("spend", FloatType(), False)])
  23.  
  24. append_payer_spend_udf = F.udf(append_payer_spend, schema_added)
  25.  
  26. new_df = df_likely_payer.withColumn("output", \
  27. append_payer_spend_udf(df_likely_payer['ts'], df_likely_payer['collected_col']))\
  28. .select(*(df_likely_payer.columns), 'output.*').drop('collected_col')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement