Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- 7609 [Thread-14-word_spout-executor[2 2]] INFO o.a.s.s.ShellSpout - ShellLog pid:35761, name:word_spout 2017-02-20 15:30:33,070 - pystorm.component.word_spout - acking w
- hen I shouldnt tup_id: 3219
- ...
- 7611 [Thread-21] INFO o.a.s.t.ShellBolt - ShellLog pid:35760, name:count_bolt 2017-02-20 15:30:33,072 - pystorm.component.count_bolt - BOLT: receiving tup_id/count: 3219
- from itertools import cycle
- from streamparse import Spout
- from streamparse import ReliableSpout
- class WordSpout(ReliableSpout):
- outputs = ['word']
- def initialize(self, stormconf, context):
- self.words = cycle(['dog', 'cat', 'zebra', 'elephant'])
- self.count = 0
- def next_tuple(self):
- self.count += 1
- word = next(self.words)
- self.emit([str(self.count)], tup_id=self.count)
- def ack(self, tup_id):
- self.logger.info("acking when I shouldnt tup_id: {0}".format(tup_id))
- def fail(self, tup_id):
- self.logger.info("failing when I should tup_id: {0}".format(tup_id))
- import os
- from collections import Counter
- from streamparse import Bolt
- class WordCountBolt(Bolt):
- auto_ack = False
- auto_fail = False
- outputs = ['word', 'count']
- def initialize(self, conf, ctx):
- self.counter = Counter()
- self.pid = os.getpid()
- self.total = 0
- def _increment(self, word, inc_by):
- self.counter[word] += inc_by
- self.total += inc_by
- def process(self, tup):
- word = tup.values[0]
- self._increment(word, 10 if word == "dog" else 1)
- # if self.total % 1000 == 0:
- # self.logger.info("counted [{:,}] words [pid={}]".format(self.total,
- # self.pid))
- self.logger.info("BOLT: receiving tup_id/count: {0}".format(word))
- # self.emit([word, self.counter[word]])
- self.fail(tup)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement