Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import datetime
- import json
- import logging
- import apache_beam as beam
- from apache_beam.options.pipeline_options import PipelineOptions
- import apache_beam.transforms.window as window
- from apache_beam import pvalue
- class GroupWindowsIntoBatches(beam.PTransform):
- """A composite transform that groups Pub/Sub messages based on publish
- time and outputs a list of dictionaries, where each contains one message
- and its publish timestamp.
- """
- def __init__(self, window_size):
- # Convert minutes into seconds.
- self.window_size = int(window_size * 60)
- def expand(self, pcoll):
- return (
- pcoll
- # Assigns window info to each Pub/Sub message based on its
- # publish timestamp.
- | "Window into Fixed Intervals"
- >> beam.WindowInto(window.FixedWindows(self.window_size))
- | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
- # Use a dummy key to group the elements in the same window.
- # Note that all the elements in one window must fit into memory
- # for this. If the windowed elements do not fit into memory,
- # please consider using `beam.util.BatchElements`.
- # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
- | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
- | "Groupby" >> beam.GroupByKey()
- | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
- )
- class AddTimestamps(beam.DoFn):
- def process(self, element, publish_time=beam.DoFn.TimestampParam):
- """Processes each incoming windowed element by extracting the Pub/Sub
- message and its publish timestamp into a dictionary. `publish_time`
- defaults to the publish timestamp returned by the Pub/Sub server. It
- is bound to each element by Beam at runtime.
- """
- # dict_out = json.loads(element.decode("utf-8"))
- dict_out = element
- dict_out["publish_time"] = datetime.datetime.utcfromtimestamp(float(publish_time)).strftime(
- "%Y-%m-%d %H:%M:%S.%f")
- yield dict_out
- class WriteBatchesToGCS(beam.DoFn):
- def __init__(self, output_path, suffix):
- self.output_path = output_path
- self.suffix = suffix
- def process(self, batch, window=beam.DoFn.WindowParam):
- """Write one batch per file to a Google Cloud Storage bucket. """
- ts_format = "%H:%M"
- window_start = window.start.to_utc_datetime().strftime(ts_format)
- window_end = window.end.to_utc_datetime().strftime(ts_format)
- filename = "-".join([self.output_path, self.suffix, window_start, window_end])
- with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
- for element in batch:
- print("WriteBatchesToGCS")
- print(element)
- f.write("{}\n".format(json.dumps(element)).encode("utf-8"))
- class WriteBatchesToBQ(beam.DoFn):
- def __init__(self, dest_table):
- self.dest_table = dest_table
- def process(self, batch):
- """Write one batch per file to a Google Cloud Storage bucket. """
- for element in batch:
- print("WriteBatchesToBQ")
- print(element)
- print(self.dest_table)
- beam.io.gcp.bigquery.WriteToBigQuery(table=self.dest_table,
- write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
- create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
- print('written to BQ')
- class Split(beam.DoFn):
- # These tags will be used to tag the outputs of this DoFn.
- OUTPUT_TAG_BQ = 'BigQuery'
- OUTPUT_TAG_GCS = 'GCS'
- def process(self, element):
- """
- tags the input as it processes the orignal PCollection
- """
- element = json.loads(element.decode("utf-8"))
- print(element)
- if element['species'] == 'guinea pig':
- yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
- print('found bq')
- else:
- yield pvalue.TaggedOutput(self.OUTPUT_TAG_GCS, element)
- print('found gcs')
- def run(input_subscription, output_path, window_size=1.0, pipeline_args=None):
- # `save_main_session` is set to true because some DoFn's rely on
- # globally imported modules.
- pipeline_options = PipelineOptions(
- pipeline_args, streaming=True, save_main_session=True, direct_running_mode='in_memory', direct_num_workers=2
- )
- with beam.Pipeline(options=pipeline_options) as pipeline:
- # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
- tagged_lines_result = (pipeline | beam.io.ReadFromPubSub(subscription=input_subscription)
- # | "Window into" >> GroupWindowsIntoBatches(window_size)
- | beam.ParDo(Split()).with_outputs(
- Split.OUTPUT_TAG_BQ,
- Split.OUTPUT_TAG_GCS))
- gcs_records = tagged_lines_result[Split.OUTPUT_TAG_GCS] | "Window into GCS" >> GroupWindowsIntoBatches(
- window_size) | "Write to GCS" >> beam.ParDo(
- WriteBatchesToGCS(output_path, 'GCS'))
- bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
- window_size) | "Write to BQ" >> beam.ParDo(
- WriteBatchesToBQ(dest_table='data-uat-280814:dev_radim.dataflow_test_b'))
- pipeline.run().wait_until_finish()
- if __name__ == "__main__": # noqa
- logging.getLogger().setLevel(logging.INFO)
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--input_subscription",
- help="The Cloud Pub/Sub subscription to read from.\n"
- '"projects/<PROJECT_NAME>/subscriptios/<SUBSCRIPTION_NAME>".',
- )
- parser.add_argument(
- "--window_size",
- type=float,
- default=1.0,
- help="Output file's window size in number of minutes.",
- )
- parser.add_argument(
- "--output_path",
- help="GCS Path of the output file including filename prefix.",
- )
- known_args, pipeline_args = parser.parse_known_args()
- run(
- known_args.input_subscription,
- known_args.output_path,
- known_args.window_size,
- pipeline_args,
- )
Advertisement
Add Comment
Please, Sign In to add comment