Advertisement
Guest User

bdcc_v3.2

a guest
Jun 16th, 2019
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.72 KB | None | 0 0
  1. #from __future__ import absolute_import
  2. import logging
  3. import apache_beam as beam
  4. from apache_beam.io import ReadFromText
  5. from apache_beam.io import WriteToText
  6. from apache_beam.options.pipeline_options import PipelineOptions
  7. from apache_beam.options.pipeline_options import SetupOptions
  8. from apache_beam.options.pipeline_options import GoogleCloudOptions
  9. from apache_beam.options.pipeline_options import StandardOptions
  10.  
  11. '''
  12. input_filename = "gs://up201405219-final/EVENTS.csv"
  13. output_filename1 = "gs://up201405219-final/output_big/output1.txt"
  14. output_filename2 = "gs://up201405219-final/output_big/output2.txt"
  15. '''
  16. input_filename = "gs://up201405219-final/file_to_an0.csv"
  17. output_filename1 = "gs://up201405219-final/output_teste/output1.txt"
  18. output_filename2 = "gs://up201405219-final/output_teste/output2.txt"
  19.  
  20. dataflow_options = ['--project=lexical-descent-231918', '--job_name=job1', '--temp_location=gs://up201405219-final/tmp']
  21. dataflow_options.append('--staging_location=gs://up201405219-final/stage')
  22. options = PipelineOptions(dataflow_options)
  23. gcloud_options = options.view_as(GoogleCloudOptions)
  24.  
  25. options.view_as(StandardOptions).runner = "dataflow"
  26.  
  27. class Split(beam.DoFn):
  28.     def process(self, element):
  29.         line = element.split(",")
  30.         if line[1] == "":
  31.             subject_id = line[1]
  32.         else:
  33.             subject_id = int(line[1])
  34.        
  35.         if line[2] == "":
  36.             hospital_id = line[2]
  37.         else:
  38.             hospital_id = int(line[2])
  39.  
  40.         if line[4] == "":
  41.             item_id = line[4]
  42.         else:
  43.             item_id = int(line[4])
  44.  
  45.         return [{
  46.             "subject" : subject_id,
  47.             "hospital" : hospital_id,
  48.             "item": item_id
  49.         }]
  50.        
  51. class CollectSubjectsPerItems(beam.DoFn):
  52.     def process(self, element):
  53.         result = [
  54.             (element["item"], element["subject"])
  55.         ]
  56.         return result
  57.  
  58. class CollectItemsPerHospitals(beam.DoFn):
  59.     def process(self, element):
  60.         result = [
  61.             (element["hospital"], element["item"])
  62.         ]
  63.         return result
  64.  
  65. class CollectSubjectsPerHospitals(beam.DoFn):
  66.     def process(self, element):
  67.         result = [
  68.             (element["subject"], element["item"])
  69.         ]
  70.         return result
  71.  
  72. class CollectDistinct(beam.DoFn):
  73.     def process(self, element):
  74.         res = []
  75.         alll =[]
  76.         count = 0
  77.         for i in element[1]:
  78.             count+=1
  79.             alll.append(i)
  80.             if i not in res:
  81.                 res.append(i)
  82.  
  83.         result = [
  84.             (element[0], [len(res), res, len(alll),  alll])
  85.         ]
  86.         return result
  87.  
  88. class CollectAll(beam.DoFn):
  89.     def process(self, element):
  90.         res = []
  91.         for i in element[1]:
  92.             res.append(i)
  93.         result = [
  94.             (element[0], [len(res)])
  95.         ]
  96.         return result
  97.  
  98. class WriteToCSV(beam.DoFn):
  99.     def process(self, element):
  100.         result = [
  101.             "{}.{}".format(element[0],
  102.             element[1]
  103.             )
  104.         ]
  105.         return result
  106.  
  107. class WriteToCSV2(beam.DoFn):
  108.     def process(self, element):
  109.         result = [
  110.             "{},{},{}".format(element[0],
  111.             element[1]['subject'][0],
  112.             element[1]['item'][0])
  113.         ]
  114.         return result
  115.  
  116. with beam.Pipeline(options = options) as p:
  117.     rows = (
  118.         p |
  119.         ReadFromText(input_filename) |
  120.         beam.ParDo(Split())
  121.     )
  122.     # para cada item, quantos pacientes diferentes o tiveram
  123.     subjects_per_items = (
  124.         rows |
  125.         beam.ParDo(CollectSubjectsPerItems()) |
  126.         "Groupping subjectsPerItems" >> beam.GroupByKey() |
  127.         "Distinct  subjectsPerItems" >> beam.ParDo(CollectDistinct()) |
  128.         "To CSV 1" >> beam.ParDo(WriteToCSV()) |
  129.         "Write 1" >> WriteToText(output_filename1)
  130.         #"Counting subjects" >> beam.CombineValues(beam.combiners.CountCombineFn())
  131.     )
  132.     # para cada hospital, quantos items diferentes aconteceram
  133.     items_per_hospitals = (
  134.         rows |
  135.         beam.ParDo(CollectItemsPerHospitals()) |
  136.         "Groupping hospitalsPerItems" >> beam.GroupByKey() |
  137.         "Distinct hospitalsPerItems" >> beam.ParDo(CollectAll())
  138.     )
  139.  
  140.     subjects_per_hospitals = (
  141.         rows |
  142.         beam.ParDo(CollectSubjectsPerHospitals()) |
  143.         "Groupping subjectsPerHospital" >> beam.GroupByKey() |
  144.         "Distinct subjectsPerHospital" >> beam.ParDo(CollectAll())
  145.     )
  146.  
  147.     to_be_joined = (
  148.         {
  149.             'subject': subjects_per_hospitals,
  150.             'item': items_per_hospitals
  151.         } |
  152.         beam.CoGroupByKey() |
  153.         "To CSV 2" >> beam.ParDo(WriteToCSV2()) |
  154.         "Write 2" >> WriteToText(output_filename2)
  155.     )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement