Guest User

Untitled

a guest
Dec 7th, 2019
198
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. """A streaming python pipeline to read in PUBSUB tweets and perform
  2. classification"""
  3.  
  4. from __future__ import absolute_import
  5.  
  6. import argparse
  7. import datetime
  8. import json
  9. import logging
  10. import numpy as np
  11. import os
  12.  
  13. import apache_beam as beam
  14. import apache_beam.transforms.window as window
  15. from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
  16. from apache_beam.options.pipeline_options import StandardOptions
  17. from apache_beam.options.pipeline_options import GoogleCloudOptions
  18. from apache_beam.options.pipeline_options import SetupOptions
  19. from apache_beam.options.pipeline_options import PipelineOptions
  20.  
  21. from apache_beam.transforms.util import BatchElements
  22.  
  23. from googleapiclient import discovery
  24.  
  25.  
  26. PROJECT_ID = os.getenv('PROJECT_ID')
  27. PIPELINE_MODE = 'DataflowRunner'
  28. DISCOVERY_SERVICE = 'https://storage.googleapis.com/cloud-ml/discovery' \
  29.                     '/ml_v1_discovery.json'
  30. BUCKET = 'my-project-dev'
  31. STAGING_LOCATION = 'gs://my-project-dev/my-project-twitter/staging'
  32. TEMP_LOCATION = 'gs://my-project-dev/my-project-twitter/tmp'
  33. REGION = 'us-central1'
  34. DATASET = 'newsml_dataset_twitter'
  35. TWITTER_TABLE = 'twitter_posts'
  36. TWITTER_TABLE_SENTIMENT = 'twitter_mean_sentiment'
  37. MODEL_URL = 'projects/my-project-257304/models/twitter'
  38.  
  39.  
  40. service = None
  41.  
  42.  
  43. def init_service():
  44.     """
  45.  
  46.    :return:
  47.    """
  48.     global service
  49.     try:
  50.         if service is None:
  51.             service = discovery.build('ml', 'v1',
  52.                                       discoveryServiceUrl=DISCOVERY_SERVICE,
  53.                                       cache_discovery=True)
  54.     except Exception as e:
  55.         logging.exception(e)
  56.  
  57.  
  58. def aggregate_format(key_values):
  59.     # Aggregate tweets per 10 second window
  60.     (key, values) = key_values
  61.  
  62.     mean_sentiment = np.mean([x['sentiment'] for x in values])
  63.     mean_timestamp = datetime.datetime.utcfromtimestamp(np.mean([
  64.         (datetime.datetime.strptime(x["posted_at"],
  65.                                     '%Y-%m-%d %H:%M:%S') -
  66.          datetime.datetime.fromtimestamp(
  67.              0)).total_seconds() for x in values
  68.     ]))
  69.  
  70.     logging.info("mean sentiment")
  71.     logging.info(mean_sentiment)
  72.  
  73.     logging.info("mean timestamp")
  74.     logging.info(mean_timestamp)
  75.  
  76.     # Return in correct format, according to BQ schema
  77.     return {"posted_at": mean_timestamp.strftime('%Y-%m-%d %H:%M:%S'),
  78.             "sentiment": mean_sentiment}
  79.  
  80.  
  81. def prediction(instances):
  82.     """
  83.    Calls the tweet_sentiment_classifier API on AI Platform to get predictions.
  84.  
  85.    Args:
  86.        instances: list of strings.
  87.    Returns:
  88.        float: estimated values
  89.    """
  90.  
  91.     # Init the Platform API
  92.     init_service()
  93.  
  94.     request_data = {'instances': instances}
  95.     logging.info("Making request to the ML API")
  96.  
  97.     # Call the model
  98.     response = service.projects().predict(body=request_data,
  99.                                           name=MODEL_URL).execute()
  100.  
  101.     # Read out the scores
  102.     values = [item["score"] for item in response['predictions']]
  103.     return values
  104.  
  105.  
  106. def estimate(messages):
  107.     # Be able to cope with a single string as well
  108.     if not isinstance(messages, list):
  109.         messages = [messages]
  110.  
  111.     # Messages from PubSub are JSON strings
  112.     instances = list(map(lambda message: json.loads(message), messages))
  113.  
  114.     # Estimate the sentiment of the 'text' of each tweet
  115.     scores = prediction([instance["text"] for instance in instances])
  116.  
  117.     # Join them together
  118.     for i, instance in enumerate(instances):
  119.         instance['sentiment'] = scores[i]
  120.  
  121.     logging.info("First message in batch:")
  122.     logging.info(instances[0])
  123.  
  124.     return instances
  125.  
  126.  
  127. def run(argv=None):
  128.     """Main pipeline run def.
  129.  
  130.    :param argv:
  131.    :return:
  132.    """
  133.  
  134.     # Make explicit BQ schema for output tables:
  135.     # Tweets tables.
  136.     bigqueryschema_json = '{"fields": [' \
  137.                           '{"name":"id","type":"STRING"},' \
  138.                           '{"name":"text","type":"STRING"},' \
  139.                           '{"name":"user_id","type":"STRING"},' \
  140.                           '{"name":"sentiment","type":"FLOAT"},' \
  141.                           '{"name":"posted_at","type":"TIMESTAMP"}' \
  142.                           ']}'
  143.     bigqueryschema = parse_table_schema_from_json(bigqueryschema_json)
  144.     # Tweets sentiment tables.
  145.     bigqueryschema_mean_json = '{"fields": [' \
  146.                                '{"name":"posted_at","type":"TIMESTAMP"},' \
  147.                                '{"name":"sentiment","type":"FLOAT"}' \
  148.                                ']}'
  149.     bigqueryschema_mean = parse_table_schema_from_json(bigqueryschema_mean_json)
  150.  
  151.     """Build and run the pipeline."""
  152.     parser = argparse.ArgumentParser()
  153.     group = parser.add_mutually_exclusive_group(required=False)
  154.     group.add_argument(
  155.         '--input_subscription',
  156.         help=('Input PubSub subscription of the form '
  157.               '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'),
  158.         default="projects/my-project-257304/subscriptions/my-project-twitter"
  159.  
  160.     )
  161.     group.add_argument(
  162.         '--input_topic',
  163.         help=('Input PubSub topic of the form '
  164.               '"projects/<PROJECT>/topics/<TOPIC>."'),
  165.         default="projects/my-project-257304/topics/my-project-twitter"
  166.     )
  167.  
  168.     known_args, pipeline_args = parser.parse_known_args(argv)
  169.  
  170.     # We use the save_main_session option because one or more DoFn's in this
  171.     # workflow rely on global context (e.g., a module imported at module level).
  172.     pipeline_options = PipelineOptions(pipeline_args)
  173.     pipeline_options.view_as(SetupOptions).save_main_session = True
  174.     pipeline_options.view_as(StandardOptions).streaming = True
  175.  
  176.     # Run on Cloud DataFlow by default
  177.     pipeline_options.view_as(StandardOptions).runner = PIPELINE_MODE
  178.     google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
  179.     google_cloud_options.project = PROJECT_ID
  180.     google_cloud_options.staging_location = STAGING_LOCATION
  181.     google_cloud_options.temp_location = TEMP_LOCATION
  182.     google_cloud_options.region = REGION
  183.  
  184.     p = beam.Pipeline(options=pipeline_options)
  185.  
  186.     # Read from PubSub into a PCollection.
  187.     if known_args.input_subscription:
  188.         lines = p | "read in tweets" >> beam.io.ReadFromPubSub(
  189.             subscription=known_args.input_subscription,
  190.             with_attributes=False,
  191.             id_label="tweet_id"
  192.         )
  193.     else:
  194.         lines = p | "read in tweets" >> beam.io.ReadFromPubSub(
  195.             topic=known_args.input_topic,
  196.             with_attributes=False,
  197.             id_label="tweet_id")
  198.  
  199.     # Window them, and batch them into batches of 50 (not too large)
  200.     output_tweets = (lines
  201.                      | 'Assign window key' >> beam.WindowInto(
  202.             window.FixedWindows(10))
  203.                      | 'Batch into n batches' >> BatchElements(
  204.             min_batch_size=49, max_batch_size=50)
  205.                      | 'Predict sentiment' >> beam.FlatMap(
  206.             lambda messages: estimate(messages))
  207.                      )
  208.  
  209.     # Write to Bigquery
  210.     output_tweets | 'store twitter posts' >> beam.io.WriteToBigQuery(
  211.         table=TWITTER_TABLE,
  212.         dataset=DATASET,
  213.         schema=bigqueryschema,
  214.         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  215.         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
  216.         project=PROJECT_ID
  217.     )
  218.  
  219.     # Average out and log the mean value
  220.     (output_tweets
  221.      | 'pair with key' >> beam.Map(lambda x: (1, x))
  222.      | 'group by key' >> beam.GroupByKey()
  223.      | 'aggregate and format' >> beam.Map(aggregate_format)
  224.      | 'store aggregated sentiment' >> beam.io.WriteToBigQuery(
  225.             table=TWITTER_TABLE_SENTIMENT,
  226.             dataset=DATASET,
  227.             schema=bigqueryschema_mean,
  228.             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  229.             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
  230.             project=PROJECT_ID))
  231.     result = p.run()
  232.     result.wait_until_finish()
  233.  
  234.  
  235. if __name__ == '__main__':
  236.     logging.getLogger().setLevel(logging.INFO)
  237.     run()
RAW Paste Data