Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark.sql import Row
- def generate_row(key, value):
- fields = [
- ('piva_vend', key.PIvaUtente),
- ('piva_dist', key.PIvaDistributore),
- ('cod_contr_disp', key.CodContrDisp),
- ('pod', value.Pod),
- ('data_misura', value.DataMisura),
- ('data_prestazione', value.DataPrest),
- ('codice_pratica_sii', value.CodPrat_SII),
- ('mese_anno', value.MeseAnno),
- ('trattamento', value.DatiPdp.Trattamento),
- ('tensione', value.DatiPdp.Tensione),
- ('forfait', value.DatiPdp.Forfait),
- ('gruppo_mis', value.DatiPdp.GruppoMis),
- ('validato', value.Misura.Validato),
- ('pot_max', value.Misura.PotMax),
- ]
- fields_a = dict(fields + [
- ('tipo_misura', 'a'),
- ('k', value.DatiPdp.Ka),
- ('f1', value.Misura.EaF1),
- ('f2', value.Misura.EaF2),
- ('f3', value.Misura.EaF3)
- ])
- fields_p = dict(fields + [
- ('tipo_misura', 'Pot'),
- ('k', value.DatiPdp.Kp),
- ('f1', value.Misura.PotF1),
- ('f2', value.Misura.PotF2),
- ('f3', value.Misura.PotF3)
- ])
- fields_r = dict(fields + [
- ('tipo_misura', 'r'),
- ('k', value.DatiPdp.Kr),
- ('f1', value.Misura.ErF1),
- ('f2', value.Misura.ErF2),
- ('f3', value.Misura.ErF3)
- ])
- return [Row(**fields_a), Row(**fields_p), Row(**fields_r)][0]
- df0 = spark.read.parquet('/notebook/2F2W2BN79/parquet/')
- rdd = df.rdd.map(lambda row: (row.identificativiflusso, row.datipod)).flatMapValues(lambda row_value: row_value)
- rdd = rdd.map(lambda (key, value): generate_row(key, value))
- df1 = rdd.toDF()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement