Advertisement
Guest User

Untitled

a guest
Dec 7th, 2019
248
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.50 KB | None | 0 0
  1. # coding: utf-8
  2. """Collects tweets using hashtags #."""
  3.  
  4. import datetime
  5. import json
  6. import logging
  7. import os
  8. import sys
  9. import time
  10. import urllib3
  11.  
  12. import tweepy
  13.  
  14. from google.cloud import pubsub_v1
  15. from tweepy.streaming import StreamListener
  16.  
  17. # Configuration
  18. PROJECT_ID = os.getenv('PROJECT_ID')
  19. PUBSUB_TOPIC = os.getenv('PUBSUB_TOPIC')
  20.  
  21. # Twitter authentication
  22. CONSUMER_KEY = os.getenv('CONSUMER_KEY')
  23. CONSUMER_SECRET = os.getenv('CONSUMER_SECRET')
  24. ACCESS_TOKEN = os.getenv('ACCESS_TOKEN')
  25. ACCESS_TOKEN_SECRET = os.getenv('ACCESS_TOKEN_SECRET')
  26.  
  27. # Define the list of terms to listen to
  28. _HASHTAGS = ["#Microsoft365", "#MicrosoftTeamsRooms", "#OneNote",
  29.              "#ProjectOnline", "#MicrosoftTeams", "#ProductiveAnywhere",
  30.              "#SharePoint", "#OneDrive", "#Yammer", "#MicrosoftStream",
  31.              "#PowerApps", "#Flows", "#Office365", "#Outlook", "#PowerBI",
  32.              "#PowerPlatform", "#Kaizala", "#FormulaFriday", "#powerpoint",
  33.              "#Windows10", "#Azure", "#TipsAndTricks", "#AzureFriday",
  34.              "#DevOps", "#cybersecurity", "#levelup", "#MicrosoftSecurity",
  35.              "#technology", "#HybridCloud", "#AI", "#IoT", "#O365"]
  36.  
  37. # Authenticate to the API
  38. auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
  39. auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
  40. api = tweepy.API(auth,
  41.                  wait_on_rate_limit=True,
  42.                  wait_on_rate_limit_notify=True)
  43.  
  44. publisher = pubsub_v1.PublisherClient()
  45. topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC)
  46.  
  47.  
  48. # Method to push messages to pubsub
  49. def write_to_pubsub(data):
  50.     """
  51.  
  52.    :param data:
  53.    :return:
  54.    """
  55.     try:
  56.         if data["lang"] == "en":
  57.             publisher.publish(topic_path, data=json.dumps({
  58.                 "text": data["text"],
  59.                 "user_id": data["user_id"],
  60.                 "id": data["id"],
  61.                 "posted_at": datetime.datetime.fromtimestamp(
  62.                     data["created_at"]).strftime('%Y-%m-%d %H:%M:%S')
  63.             }).encode("utf-8"), tweet_id=str(data["id"]).encode("utf-8"))
  64.     except Exception as e:
  65.         raise
  66.  
  67.  
  68. # Method to format a tweet from tweepy
  69. def reformat_tweet(tweet):
  70.     """
  71.  
  72.    :param tweet:
  73.    :return:
  74.    """
  75.     x = tweet
  76.  
  77.     processed_doc = {
  78.         "id": x["id"],
  79.         "lang": x["lang"],
  80.         "retweeted_id": x["retweeted_status"][
  81.             "id"] if "retweeted_status" in x else None,
  82.         "favorite_count": x["favorite_count"] if "favorite_count" in x else 0,
  83.         "retweet_count": x["retweet_count"] if "retweet_count" in x else 0,
  84.         "coordinates_latitude": x["coordinates"]["coordinates"][0] if x[
  85.             "coordinates"] else 0,
  86.         "coordinates_longitude": x["coordinates"]["coordinates"][0] if x[
  87.             "coordinates"] else 0,
  88.         "place": x["place"]["country_code"] if x["place"] else None,
  89.         "user_id": x["user"]["id"],
  90.         "created_at": time.mktime(
  91.             time.strptime(x["created_at"], "%a %b %d %H:%M:%S +0000 %Y"))
  92.     }
  93.  
  94.     if x["entities"]["hashtags"]:
  95.         processed_doc["hashtags"] = [
  96.             {"text": y["text"], "startindex": y["indices"][0]} for y in
  97.             x["entities"]["hashtags"]]
  98.     else:
  99.         processed_doc["hashtags"] = []
  100.  
  101.     if x["entities"]["user_mentions"]:
  102.         processed_doc["usermentions"] = [
  103.             {"screen_name": y["screen_name"], "startindex": y["indices"][0]} for
  104.             y in
  105.             x["entities"]["user_mentions"]]
  106.     else:
  107.         processed_doc["usermentions"] = []
  108.  
  109.     if "extended_tweet" in x:
  110.         processed_doc["text"] = x["extended_tweet"]["full_text"]
  111.     elif "full_text" in x:
  112.         processed_doc["text"] = x["full_text"]
  113.     else:
  114.         processed_doc["text"] = x["text"]
  115.  
  116.     return processed_doc
  117.  
  118.  
  119. # Custom listener class
  120. class Listener(StreamListener):
  121.     """ A listener handles tweets that are received from the stream.
  122.    This is a basic listener that just pushes tweets to Google Cloud PubSub
  123.    """
  124.  
  125.     def __init__(self):
  126.         super(Listener, self).__init__()
  127.         self._counter = 0
  128.  
  129.     def on_status(self, status):
  130.         write_to_pubsub(reformat_tweet(status._json))
  131.         self._counter += 1
  132.         logging.info(status._json)
  133.         return True
  134.  
  135.     def on_error(self, status_code):
  136.         logging.error('Encountered error with status code:', status_code,
  137.                       sys.stderr)
  138.         if status_code == 420:
  139.             logging.error("Rate limit active")
  140.             return False
  141.  
  142.  
  143. def start_stream(stream, **kwargs):
  144.     """
  145.  
  146.    :param stream:
  147.    :param kwargs:
  148.    :return:
  149.    """
  150.     try:
  151.         stream.filter(**kwargs)
  152.     except urllib3.exceptions.ReadTimeoutError:
  153.         stream.disconnect()
  154.         logging.exception("ReadTimeoutError exception")
  155.         start_stream(stream, **kwargs)
  156.     except Exception as e:
  157.         stream.disconnect()
  158.         logging.exception("Fatal exception. Consult logs.", e)
  159.         start_stream(stream, **kwargs)
  160.  
  161.  
  162. def run():
  163.     listener = Listener()
  164.     twitter_stream = tweepy.Stream(auth, listener=listener,
  165.                                    tweet_mode='extended')
  166.     try:
  167.         logging.info('Start Twitter streaming...')
  168.         start_stream(twitter_stream, track=_HASHTAGS)
  169.     except KeyboardInterrupt:
  170.         logging.exception("Stopped.")
  171.     finally:
  172.         logging.info('Done.')
  173.         twitter_stream.disconnect()
  174.  
  175.  
  176. if __name__ == '__main__':
  177.     logging.getLogger().setLevel(logging.INFO)
  178.     run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement