Guest User

Untitled

a guest
Feb 23rd, 2021
1,072
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.51 KB | None | 0 0
  1. import argparse
  2. import datetime
  3. import json
  4. import logging
  5.  
  6. import apache_beam as beam
  7. from apache_beam.options.pipeline_options import PipelineOptions
  8. import apache_beam.transforms.window as window
  9. from apache_beam import pvalue
  10.  
  11.  
  12. class GroupWindowsIntoBatches(beam.PTransform):
  13.     """A composite transform that groups Pub/Sub messages based on publish
  14.    time and outputs a list of dictionaries, where each contains one message
  15.    and its publish timestamp.
  16.    """
  17.  
  18.     def __init__(self, window_size):
  19.         # Convert minutes into seconds.
  20.         self.window_size = int(window_size * 60)
  21.  
  22.     def expand(self, pcoll):
  23.         return (
  24.                 pcoll
  25.                 # Assigns window info to each Pub/Sub message based on its
  26.                 # publish timestamp.
  27.                 | "Window into Fixed Intervals"
  28.                 >> beam.WindowInto(window.FixedWindows(self.window_size))
  29.                 | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
  30.                 # Use a dummy key to group the elements in the same window.
  31.                 # Note that all the elements in one window must fit into memory
  32.                 # for this. If the windowed elements do not fit into memory,
  33.                 # please consider using `beam.util.BatchElements`.
  34.                 # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
  35.                 | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
  36.                 | "Groupby" >> beam.GroupByKey()
  37.                 | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
  38.         )
  39.  
  40.  
  41. class AddTimestamps(beam.DoFn):
  42.     def process(self, element, publish_time=beam.DoFn.TimestampParam):
  43.         """Processes each incoming windowed element by extracting the Pub/Sub
  44.        message and its publish timestamp into a dictionary. `publish_time`
  45.        defaults to the publish timestamp returned by the Pub/Sub server. It
  46.        is bound to each element by Beam at runtime.
  47.        """
  48.         # dict_out = json.loads(element.decode("utf-8"))
  49.         dict_out = element
  50.         dict_out["publish_time"] = datetime.datetime.utcfromtimestamp(float(publish_time)).strftime(
  51.             "%Y-%m-%d %H:%M:%S.%f")
  52.         yield dict_out
  53.  
  54.  
  55. class WriteBatchesToGCS(beam.DoFn):
  56.     def __init__(self, output_path, suffix):
  57.         self.output_path = output_path
  58.         self.suffix = suffix
  59.  
  60.     def process(self, batch, window=beam.DoFn.WindowParam):
  61.         """Write one batch per file to a Google Cloud Storage bucket. """
  62.  
  63.         ts_format = "%H:%M"
  64.         window_start = window.start.to_utc_datetime().strftime(ts_format)
  65.         window_end = window.end.to_utc_datetime().strftime(ts_format)
  66.         filename = "-".join([self.output_path, self.suffix, window_start, window_end])
  67.         with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
  68.             for element in batch:
  69.                 print("WriteBatchesToGCS")
  70.                 print(element)
  71.                 f.write("{}\n".format(json.dumps(element)).encode("utf-8"))
  72.  
  73.  
  74. class WriteBatchesToBQ(beam.DoFn):
  75.     def __init__(self, dest_table):
  76.         self.dest_table = dest_table
  77.  
  78.     def process(self, batch):
  79.         """Write one batch per file to a Google Cloud Storage bucket. """
  80.         for element in batch:
  81.             print("WriteBatchesToBQ")
  82.             print(element)
  83.             print(self.dest_table)
  84.             beam.io.gcp.bigquery.WriteToBigQuery(table=self.dest_table,
  85.                                                  write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
  86.                                                  create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
  87.             print('written to BQ')
  88.  
  89.  
  90. class Split(beam.DoFn):
  91.     # These tags will be used to tag the outputs of this DoFn.
  92.     OUTPUT_TAG_BQ = 'BigQuery'
  93.     OUTPUT_TAG_GCS = 'GCS'
  94.  
  95.     def process(self, element):
  96.         """
  97.        tags the input as it processes the orignal PCollection
  98.        """
  99.         element = json.loads(element.decode("utf-8"))
  100.         print(element)
  101.         if element['species'] == 'guinea pig':
  102.             yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
  103.             print('found bq')
  104.         else:
  105.             yield pvalue.TaggedOutput(self.OUTPUT_TAG_GCS, element)
  106.             print('found gcs')
  107.  
  108.  
  109. def run(input_subscription, output_path, window_size=1.0, pipeline_args=None):
  110.     # `save_main_session` is set to true because some DoFn's rely on
  111.     # globally imported modules.
  112.     pipeline_options = PipelineOptions(
  113.         pipeline_args, streaming=True, save_main_session=True, direct_running_mode='in_memory', direct_num_workers=2
  114.     )
  115.  
  116.     with beam.Pipeline(options=pipeline_options) as pipeline:
  117.         # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
  118.         tagged_lines_result = (pipeline | beam.io.ReadFromPubSub(subscription=input_subscription)
  119.                                # | "Window into" >> GroupWindowsIntoBatches(window_size)
  120.                                | beam.ParDo(Split()).with_outputs(
  121.                     Split.OUTPUT_TAG_BQ,
  122.                     Split.OUTPUT_TAG_GCS))
  123.  
  124.         gcs_records = tagged_lines_result[Split.OUTPUT_TAG_GCS] | "Window into GCS" >> GroupWindowsIntoBatches(
  125.             window_size) | "Write to GCS" >> beam.ParDo(
  126.             WriteBatchesToGCS(output_path, 'GCS'))
  127.         bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
  128.             window_size) | "Write to BQ" >> beam.ParDo(
  129.             WriteBatchesToBQ(dest_table='data-uat-280814:dev_radim.dataflow_test_b'))
  130.  
  131.         pipeline.run().wait_until_finish()
  132.  
  133.  
  134. if __name__ == "__main__":  # noqa
  135.     logging.getLogger().setLevel(logging.INFO)
  136.  
  137.     parser = argparse.ArgumentParser()
  138.     parser.add_argument(
  139.         "--input_subscription",
  140.         help="The Cloud Pub/Sub subscription to read from.\n"
  141.              '"projects/<PROJECT_NAME>/subscriptios/<SUBSCRIPTION_NAME>".',
  142.     )
  143.     parser.add_argument(
  144.         "--window_size",
  145.         type=float,
  146.         default=1.0,
  147.         help="Output file's window size in number of minutes.",
  148.     )
  149.     parser.add_argument(
  150.         "--output_path",
  151.         help="GCS Path of the output file including filename prefix.",
  152.     )
  153.     known_args, pipeline_args = parser.parse_known_args()
  154.  
  155.     run(
  156.         known_args.input_subscription,
  157.         known_args.output_path,
  158.         known_args.window_size,
  159.         pipeline_args,
  160.     )
  161.  
Advertisement
Add Comment
Please, Sign In to add comment