Advertisement
Guest User

Untitled

a guest
Feb 24th, 2017
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.85 KB | None | 0 0
  1. 7609 [Thread-14-word_spout-executor[2 2]] INFO o.a.s.s.ShellSpout - ShellLog pid:35761, name:word_spout 2017-02-20 15:30:33,070 - pystorm.component.word_spout - acking w
  2. hen I shouldnt tup_id: 3219
  3.  
  4. ...
  5.  
  6. 7611 [Thread-21] INFO o.a.s.t.ShellBolt - ShellLog pid:35760, name:count_bolt 2017-02-20 15:30:33,072 - pystorm.component.count_bolt - BOLT: receiving tup_id/count: 3219
  7.  
  8. from itertools import cycle
  9.  
  10. from streamparse import Spout
  11. from streamparse import ReliableSpout
  12.  
  13.  
  14. class WordSpout(ReliableSpout):
  15. outputs = ['word']
  16.  
  17. def initialize(self, stormconf, context):
  18. self.words = cycle(['dog', 'cat', 'zebra', 'elephant'])
  19. self.count = 0
  20.  
  21. def next_tuple(self):
  22. self.count += 1
  23. word = next(self.words)
  24. self.emit([str(self.count)], tup_id=self.count)
  25.  
  26. def ack(self, tup_id):
  27. self.logger.info("acking when I shouldnt tup_id: {0}".format(tup_id))
  28.  
  29. def fail(self, tup_id):
  30. self.logger.info("failing when I should tup_id: {0}".format(tup_id))
  31.  
  32. import os
  33. from collections import Counter
  34.  
  35. from streamparse import Bolt
  36.  
  37.  
  38. class WordCountBolt(Bolt):
  39. auto_ack = False
  40. auto_fail = False
  41.  
  42. outputs = ['word', 'count']
  43.  
  44. def initialize(self, conf, ctx):
  45. self.counter = Counter()
  46. self.pid = os.getpid()
  47. self.total = 0
  48.  
  49. def _increment(self, word, inc_by):
  50. self.counter[word] += inc_by
  51. self.total += inc_by
  52.  
  53. def process(self, tup):
  54. word = tup.values[0]
  55. self._increment(word, 10 if word == "dog" else 1)
  56. # if self.total % 1000 == 0:
  57. # self.logger.info("counted [{:,}] words [pid={}]".format(self.total,
  58. # self.pid))
  59. self.logger.info("BOLT: receiving tup_id/count: {0}".format(word))
  60. # self.emit([word, self.counter[word]])
  61. self.fail(tup)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement