Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ==================================================================================================================
- hashtagspout.py
- ==================================================================================================================
- import sys
- import time
- import random
- import logging
- from twitter import *
- from petrel import storm
- from petrel.emitter import Spout
- log = logging.getLogger('hashtagspout')
- log.debug('hashtagspout started')
- class HashtagSpout(Spout):
- def __init__(self):
- super(HashtagSpout, self).__init__(script=__file__)
- def declareOutputFields(cls):
- return ['tag', 'date']
- t = [[{'indices': [96, 101], 'text': 'cute'}],[{'indices': [96, 101], 'text': 'beauty'}],[{'indices': [96, 101], 'text': 'nice'}]]
- def nextTuple(self):
- sys.stdout.write("started tuple")
- tag = self.t[random.randint(0, len(t) - 1)]
- date = 'Tue Apr 01 05:27:35 +0000 2014'
- log.debug('Sending HashTag For Processing: %s', hashtag)
- sys.stdout.write(tag)
- storm.emit([tag, date])
- def run():
- HashtagSpout().run()
- ==================================================================================================================
- splithashtag.py
- ==================================================================================================================
- import sys
- import logging
- from petrel import storm
- from petrel.emitter import BasicBolt
- log = logging.getLogger('splithashtag')
- log.debug('splithashtag Started')
- class SplitHashtagBolt(BasicBolt):
- def __init__(self):
- super(SplitHashtagBolt, self).__init__(script=__file__)
- @classmethod
- def declareOutputFields(self):
- return ['date','tag']
- def process(self, tup):
- log.debug('SplitHashtagBolt.process() started with: %s', tup)
- data = tup.values[0]
- for i in data:
- log.debug('SplitHashtagBolt.process() emitting: %s', i['text'])
- date = tup.values[1]
- tag = i['text']
- sys.stdout.write("splithashtag: %s" %tag)
- storm.emit([date, tag])
- def run():
- SplitHashtagBolt().run()
- ==================================================================================================================
- hashtagcount.py
- ==================================================================================================================
- import sys
- import redis
- import logging
- from collections import defaultdict
- from petrel import storm
- from petrel.emitter import BasicBolt
- log = logging.getLogger('hashtagcount')
- log.debug('hashtagcount started')
- r = redis.StrictRedis(host='localhost', port=6379, db=0)
- class HashtagCountBolt(BasicBolt):
- def __init__(self):
- super(HashtagCountBolt, self).__init__(script=__file__)
- self._count = defaultdict(int)
- @classmethod
- def declareOutputFields(cls):
- return ['tag', 'count']
- def process(self, tup):
- log.debug("HashtagCountBolt.process() started with: %s", tup)
- tag = tup.values[1]
- self._count[tag] =+ 1
- storm.emit([tag, self._count[tag]])
- r.set("tag:%s" %tag, "%s, %s" % (tup.values[0],self._count[tag]))
- r.rpush("%s" %tag, "{%s:%s}" % (tup.values[0],self._count[tag]))
- sys.stdout.write("data addded to redis: %s" %tag)
- def run():
- HashtagCountBolt().run()
- ======================================================================================================
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement