Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #from __future__ import absolute_import
- import logging
- import apache_beam as beam
- from apache_beam.io import ReadFromText
- from apache_beam.io import WriteToText
- from apache_beam.options.pipeline_options import PipelineOptions
- from apache_beam.options.pipeline_options import SetupOptions
- from apache_beam.options.pipeline_options import GoogleCloudOptions
- from apache_beam.options.pipeline_options import StandardOptions
- #input_filename = "gs://up201405219-final/EVENTS.csv"
- input_filename = "gs://up201405219-final/file_to_an0.csv"
- output_filename1 = "gs://up201405219-final/output_big/output1.txt"
- output_filename2 = "gs://up201405219-final/output_big/output2.txt"
- dataflow_options = ['--project=lexical-descent-231918', '--job_name=job2', '--temp_location=gs://up201405219-final/tmp']
- dataflow_options.append('--staging_location=gs://up201405219-final/stage')
- options = PipelineOptions(dataflow_options)
- gcloud_options = options.view_as(GoogleCloudOptions)
- options.view_as(StandardOptions).runner = "dataflow"
- class Split(beam.DoFn):
- def process(self, element):
- line = element.split(",")
- if line[1] == "":
- subject_id = line[1]
- else:
- subject_id = int(line[1])
- if line[2] == "":
- hospital_id = line[2]
- else:
- hospital_id = int(line[2])
- if line[4] == "":
- item_id = line[4]
- else:
- item_id = int(line[4])
- return [{
- "subject" : subject_id,
- "hospital" : hospital_id,
- "item": item_id
- }]
- class CollectSubjectsPerItems(beam.DoFn):
- def process(self, element):
- result = [
- (element["item"], element["subject"])
- ]
- return result
- class CollectItemsPerHospitals(beam.DoFn):
- def process(self, element):
- result = [
- (element["hospital"], element["item"])
- ]
- return result
- class CollectDistinct(beam.DoFn):
- def process(self, element):
- res = []
- alll =[]
- count = 0
- for i in element[1]:
- count+=1
- alll.append(i)
- if i not in res:
- res.append(i)
- result = [
- (element[0], [len(res), res, len(alll), alll])
- ]
- return result
- class CollectAll(beam.DoFn):
- def process(self, element):
- res = []
- for i in element[1]:
- res.append(i)
- result = [
- (element[0], [len(res)])
- ]
- return result
- class WriteToCSV(beam.DoFn):
- def process(self, element):
- result = [
- "{}.{}".format(element[0],
- element[1]
- )
- ]
- return result
- with beam.Pipeline(options = options) as p:
- rows = (
- p |
- ReadFromText(input_filename) |
- beam.ParDo(Split())
- )
- # para cada item, quantos pacientes diferentes o tiveram
- subjects = (
- rows |
- beam.ParDo(CollectSubjectsPerItems()) |
- "Groupping subjects" >> beam.GroupByKey() |
- "Distinct subjects" >> beam.ParDo(CollectDistinct()) |
- "To CSV 1" >> beam.ParDo(WriteToCSV()) |
- "Write 1" >> WriteToText(output_filename1)
- #"Counting subjects" >> beam.CombineValues(beam.combiners.CountCombineFn())
- )
- # para cada hospital, quantos items diferentes aconteceram
- hospitals = (
- rows |
- beam.ParDo(CollectItemsPerHospitals()) |
- "Groupping items" >> beam.GroupByKey() |
- "Distinct itmes" >> beam.ParDo(CollectAll()) |
- "To CSV 2" >> beam.ParDo(WriteToCSV()) |
- "Write 2" >> WriteToText(output_filename2)
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement