Advertisement
Guest User

snippet

a guest
Mar 31st, 2020
126
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.67 KB | None | 0 0
  1. from pyspark.sql import Row
  2.  
  3. def generate_row(key, value):
  4. fields = [
  5. ('piva_vend', key.PIvaUtente),
  6. ('piva_dist', key.PIvaDistributore),
  7. ('cod_contr_disp', key.CodContrDisp),
  8. ('pod', value.Pod),
  9. ('data_misura', value.DataMisura),
  10. ('data_prestazione', value.DataPrest),
  11. ('codice_pratica_sii', value.CodPrat_SII),
  12. ('mese_anno', value.MeseAnno),
  13. ('trattamento', value.DatiPdp.Trattamento),
  14. ('tensione', value.DatiPdp.Tensione),
  15. ('forfait', value.DatiPdp.Forfait),
  16. ('gruppo_mis', value.DatiPdp.GruppoMis),
  17. ('validato', value.Misura.Validato),
  18. ('pot_max', value.Misura.PotMax),
  19. ]
  20.  
  21. fields_a = dict(fields + [
  22. ('tipo_misura', 'a'),
  23. ('k', value.DatiPdp.Ka),
  24. ('f1', value.Misura.EaF1),
  25. ('f2', value.Misura.EaF2),
  26. ('f3', value.Misura.EaF3)
  27. ])
  28.  
  29. fields_p = dict(fields + [
  30. ('tipo_misura', 'Pot'),
  31. ('k', value.DatiPdp.Kp),
  32. ('f1', value.Misura.PotF1),
  33. ('f2', value.Misura.PotF2),
  34. ('f3', value.Misura.PotF3)
  35. ])
  36.  
  37. fields_r = dict(fields + [
  38. ('tipo_misura', 'r'),
  39. ('k', value.DatiPdp.Kr),
  40. ('f1', value.Misura.ErF1),
  41. ('f2', value.Misura.ErF2),
  42. ('f3', value.Misura.ErF3)
  43. ])
  44.  
  45. return [Row(**fields_a), Row(**fields_p), Row(**fields_r)][0]
  46.  
  47. df0 = spark.read.parquet('/notebook/2F2W2BN79/parquet/')
  48. rdd = df.rdd.map(lambda row: (row.identificativiflusso, row.datipod)).flatMapValues(lambda row_value: row_value)
  49. rdd = rdd.map(lambda (key, value): generate_row(key, value))
  50. df1 = rdd.toDF()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement