Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from tweepy.streaming import StreamListener
- from tweepy import OAuthHandler
- from tweepy import Stream
- from kafka import KafkaClient, SimpleProducer
- import json
- import requests
- import datetime
- import time
- import schedule
- from collections import Counter
- WEATHER_KEY = '6ae0ae17ad4a704d053d600d126b2b51'
- CONSUMER_KEY = 'r4MfeaR7f0u8ZN4UJhs4iE6B0'
- CONSUMER_SECRET = 'HpqlyCXux9YafelBkVdTTAAx4EcwnBwT22FFHXT7JhTgSFv46f'
- ACCESS_TOKEN = '1192462382611742720-CclEg4ez8mIyErD4XdbBhYYFFTowhZ'
- ACCESS_SECRET = 'SzlbSZUF9AMui5OtrqVX144brAhPL0tT42E0oglPNhBTZ'
- class TweetStreamer():
- def __init__(self):
- pass
- def stream_tweets(self):
- listener = StdOutListener()
- auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
- auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
- stream = Stream(auth, listener)
- stream.filter(locations=[-74.1687,40.5722,-73.8062,40.9467])
- class StdOutListener(StreamListener):
- def __init__(self):
- self.kafka = KafkaClient("localhost:9092")
- self.producer = SimpleProducer(self.kafka)
- def on_data(self, data):
- try:
- curr_tweet = json.loads(data)
- data = json.dumps(curr_tweet)
- self.producer.send_messages('tweets', data.encode('utf-8'))
- print("Successfully sent message to kafka")
- except BaseException as e:
- print("Error on_data %s" % str(e))
- return True
- def on_error(self, status):
- print(status)
- if __name__=='__main__':
- twitter_streamer = TweetStreamer()
- twitter_streamer.stream_tweets()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement