Advertisement
Guest User

bdcc_v3.1

a guest
Jun 16th, 2019
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.68 KB | None | 0 0
  1. #from __future__ import absolute_import
  2. import logging
  3. import apache_beam as beam
  4. from apache_beam.io import ReadFromText
  5. from apache_beam.io import WriteToText
  6. from apache_beam.options.pipeline_options import PipelineOptions
  7. from apache_beam.options.pipeline_options import SetupOptions
  8. from apache_beam.options.pipeline_options import GoogleCloudOptions
  9. from apache_beam.options.pipeline_options import StandardOptions
  10.  
  11. #input_filename = "gs://up201405219-final/EVENTS.csv"
  12. input_filename = "gs://up201405219-final/file_to_an0.csv"
  13. output_filename1 = "gs://up201405219-final/output_big/output1.txt"
  14. output_filename2 = "gs://up201405219-final/output_big/output2.txt"
  15.  
  16. dataflow_options = ['--project=lexical-descent-231918', '--job_name=job2', '--temp_location=gs://up201405219-final/tmp']
  17. dataflow_options.append('--staging_location=gs://up201405219-final/stage')
  18. options = PipelineOptions(dataflow_options)
  19. gcloud_options = options.view_as(GoogleCloudOptions)
  20.  
  21. options.view_as(StandardOptions).runner = "dataflow"
  22.  
  23. class Split(beam.DoFn):
  24.     def process(self, element):
  25.         line = element.split(",")
  26.         if line[1] == "":
  27.             subject_id = line[1]
  28.         else:
  29.             subject_id = int(line[1])
  30.        
  31.         if line[2] == "":
  32.             hospital_id = line[2]
  33.         else:
  34.             hospital_id = int(line[2])
  35.  
  36.         if line[4] == "":
  37.             item_id = line[4]
  38.         else:
  39.             item_id = int(line[4])
  40.  
  41.         return [{
  42.             "subject" : subject_id,
  43.             "hospital" : hospital_id,
  44.             "item": item_id
  45.         }]
  46. class CollectSubjectsPerItems(beam.DoFn):
  47.     def process(self, element):
  48.         result = [
  49.             (element["item"], element["subject"])
  50.         ]
  51.         return result
  52.  
  53. class CollectItemsPerHospitals(beam.DoFn):
  54.     def process(self, element):
  55.         result = [
  56.             (element["hospital"], element["item"])
  57.         ]
  58.         return result
  59.  
  60. class CollectDistinct(beam.DoFn):
  61.     def process(self, element):
  62.         res = []
  63.         alll =[]
  64.         count = 0
  65.         for i in element[1]:
  66.             count+=1
  67.             alll.append(i)
  68.             if i not in res:
  69.                 res.append(i)
  70.  
  71.         result = [
  72.             (element[0], [len(res), res, len(alll),  alll])
  73.         ]
  74.         return result
  75.  
  76. class CollectAll(beam.DoFn):
  77.     def process(self, element):
  78.         res = []
  79.         for i in element[1]:
  80.             res.append(i)
  81.         result = [
  82.             (element[0], [len(res)])
  83.         ]
  84.         return result
  85.  
  86. class WriteToCSV(beam.DoFn):
  87.     def process(self, element):
  88.         result = [
  89.             "{}.{}".format(element[0],
  90.             element[1]
  91.             )
  92.         ]
  93.         return result
  94.  
  95. with beam.Pipeline(options = options) as p:
  96.     rows = (
  97.         p |
  98.         ReadFromText(input_filename) |
  99.         beam.ParDo(Split())
  100.     )
  101.     # para cada item, quantos pacientes diferentes o tiveram
  102.     subjects = (
  103.         rows |
  104.         beam.ParDo(CollectSubjectsPerItems()) |
  105.         "Groupping subjects" >> beam.GroupByKey() |
  106.         "Distinct  subjects" >> beam.ParDo(CollectDistinct()) |
  107.         "To CSV 1" >> beam.ParDo(WriteToCSV()) |
  108.         "Write 1" >> WriteToText(output_filename1)
  109.         #"Counting subjects" >> beam.CombineValues(beam.combiners.CountCombineFn())
  110.     )
  111.     # para cada hospital, quantos items diferentes aconteceram
  112.     hospitals = (
  113.         rows |
  114.         beam.ParDo(CollectItemsPerHospitals()) |
  115.         "Groupping items" >> beam.GroupByKey() |
  116.         "Distinct itmes" >> beam.ParDo(CollectAll()) |
  117.         "To CSV 2" >> beam.ParDo(WriteToCSV()) |
  118.         "Write 2" >> WriteToText(output_filename2)
  119.     )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement