Advertisement
Guest User

dataflow.py

a guest
Feb 23rd, 2021
542
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.34 KB | None | 0 0
  1. import argparse
  2. import datetime
  3. import json
  4. import logging
  5.  
  6. import apache_beam as beam
  7. from apache_beam.options.pipeline_options import PipelineOptions
  8. import apache_beam.transforms.window as window
  9.  
  10.  
  11. class GroupWindowsIntoBatches(beam.PTransform):
  12.     """A composite transform that groups Pub/Sub messages based on publish
  13.    time and outputs a list of dictionaries, where each contains one message
  14.    and its publish timestamp.
  15.    """
  16.  
  17.     def __init__(self, window_size):
  18.         # Convert minutes into seconds.
  19.         self.window_size = int(window_size * 60)
  20.  
  21.     def expand(self, pcoll):
  22.         return (
  23.             pcoll
  24.             # Assigns window info to each Pub/Sub message based on its
  25.             # publish timestamp.
  26.             | "Window into Fixed Intervals"
  27.             >> beam.WindowInto(window.FixedWindows(self.window_size))
  28.             | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
  29.             # Use a dummy key to group the elements in the same window.
  30.             # Note that all the elements in one window must fit into memory
  31.             # for this. If the windowed elements do not fit into memory,
  32.             # please consider using `beam.util.BatchElements`.
  33.             # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
  34.             | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
  35.             | "Groupby" >> beam.GroupByKey()
  36.             | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
  37.         )
  38.  
  39.  
  40. class AddTimestamps(beam.DoFn):
  41.     def process(self, element, publish_time=beam.DoFn.TimestampParam):
  42.         """Processes each incoming windowed element by extracting the Pub/Sub
  43.        message and its publish timestamp into a dictionary. `publish_time`
  44.        defaults to the publish timestamp returned by the Pub/Sub server. It
  45.        is bound to each element by Beam at runtime.
  46.        """
  47.         dict_out = json.loads(element.decode("utf-8"))
  48.         dict_out["publish_time"] = datetime.datetime.utcfromtimestamp(float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f")
  49.         yield dict_out
  50.  
  51.  
  52. class WriteBatchesToGCS(beam.DoFn):
  53.     def __init__(self, output_path):
  54.         self.output_path = output_path
  55.  
  56.     def process(self, batch, window=beam.DoFn.WindowParam):
  57.         """Write one batch per file to a Google Cloud Storage bucket. """
  58.  
  59.         ts_format = "%H:%M"
  60.         window_start = window.start.to_utc_datetime().strftime(ts_format)
  61.         window_end = window.end.to_utc_datetime().strftime(ts_format)
  62.         filename = "-".join([self.output_path, window_start, window_end])
  63.  
  64.         with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
  65.             for element in batch:
  66.                 f.write("{}\n".format(json.dumps(element)).encode("utf-8"))
  67.  
  68.  
  69. def run(input_subscription, output_path, window_size=1.0, pipeline_args=None):
  70.     # `save_main_session` is set to true because some DoFn's rely on
  71.     # globally imported modules.
  72.     pipeline_options = PipelineOptions(
  73.         pipeline_args,  streaming=True, save_main_session=True, direct_running_mode='in_memory',direct_num_workers =2
  74.     )
  75.  
  76.     with beam.Pipeline(options=pipeline_options) as pipeline:
  77.         (
  78.             pipeline
  79.             | "Read PubSub Messages"
  80.             >> beam.io.ReadFromPubSub(subscription=input_subscription)
  81.             | "Window into" >> GroupWindowsIntoBatches(window_size)
  82.             | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))
  83.         )
  84.  
  85.  
  86. if __name__ == "__main__":  # noqa
  87.     logging.getLogger().setLevel(logging.INFO)
  88.  
  89.     parser = argparse.ArgumentParser()
  90.     parser.add_argument(
  91.         "--input_subscription",
  92.         help="The Cloud Pub/Sub subscription to read from.\n"
  93.         '"projects/<PROJECT_NAME>/subscriptios/<SUBSCRIPTION_NAME>".',
  94.     )
  95.     parser.add_argument(
  96.         "--window_size",
  97.         type=float,
  98.         default=1.0,
  99.         help="Output file's window size in number of minutes.",
  100.     )
  101.     parser.add_argument(
  102.         "--output_path",
  103.         help="GCS Path of the output file including filename prefix.",
  104.     )
  105.     known_args, pipeline_args = parser.parse_known_args()
  106.  
  107.     run(
  108.         known_args.input_subscription,
  109.         known_args.output_path,
  110.         known_args.window_size,
  111.         pipeline_args,
  112.     )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement