Guest User

Untitled

a guest
Jan 16th, 2018
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.32 KB | None | 0 0
  1. package de.hska.iwi.vsys.bdelab.streaming;
  2.  
  3. import org.apache.storm.topology.BasicOutputCollector;
  4. import org.apache.storm.topology.OutputFieldsDeclarer;
  5. import org.apache.storm.tuple.Fields;
  6. import org.apache.storm.tuple.Tuple;
  7. import org.apache.storm.tuple.Values;
  8.  
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /*
  12. START SERVICES
  13. /usr/local/opt/kafka_2.11-1.0.0/bin/zookeeper-server-start.sh /usr/local/opt/kafka_2.11-1.0.0/config/zookeeper.properties
  14. /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-server-start.sh /usr/local/opt/kafka_2.11-1.0.0/config/server.properties
  15.  
  16. CREATE TOPIC
  17. /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sentence_yuar1011
  18.  
  19. CHECK IT*S LIVE
  20. /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
  21.  
  22. SEND MESSAGES
  23. /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sentence_yuar1011
  24.  
  25. RECEIVE MESSAGES
  26. /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sentence_yuar1011 --from-beginning
  27. */
  28.  
  29.  
  30. public class WordCountBolt extends NoisyBolt {
  31. private Map<String, Integer> counts = new HashMap<>();
  32.  
  33. @Override
  34. public void execute(Tuple tuple, BasicOutputCollector collector) {
  35. System.out.println(getIDs() + " executes tuple: " + tuple);
  36.  
  37. String word = tuple.getString(0);
  38. Integer count = counts.get(word);
  39. if (count == null) {
  40. count = 0;
  41. }
  42. count++;
  43. counts.put(word, count);
  44.  
  45. Values values = new Values(word, count);
  46. System.out.println(getIDs() + " result values: " + values);
  47.  
  48. collector.emit(values);
  49.  
  50.  
  51. /*
  52. ADD bolt for trimming your urls
  53. String url = tuple.getString(0);
  54. String time = tuple.getString(1);
  55. url = trimUrl(url);
  56. collector.emit(new Values(url, time));
  57.  
  58. public static String trimUrl(String url) {
  59. return url.split("/")[2];
  60. }
  61.  
  62. ADD bolt for bucketing your epoch-times
  63. String url = tuple.getString(0);
  64. long time = Long.parseLong( tuple.getString(1), 10);
  65. String timeBucket = convertToHour(time);
  66. collector.emit(new Values(url, timeBucket));
  67.  
  68. public static String convertToHour(long timestamp) {
  69. long timestampInMs = timestamp * 1000;
  70. Date date = new Date(timestampInMs);
  71. Format format = new SimpleDateFormat("dd.MM.yyyy/HH");
  72. return format.format(date);
  73. }
  74.  
  75. THIS bolt for counting
  76. String url = tuple.getString(0);
  77. String time = tuple.getString(1);
  78.  
  79. String compositeKey = url + " " + time;
  80. Integer count = counts.get(compositeKey);
  81. if (count == null) {
  82. count = 0;
  83. }
  84. count++;
  85. counts.put(compositeKey, count);
  86.  
  87. Values values = new Values(compositeKey, count);
  88. System.out.println(getIDs() + " result values: " + values);
  89.  
  90. collector.emit(values);
  91. */
  92. }
  93.  
  94. @Override
  95. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  96. declarer.declare(new Fields("word", "count"));
  97. }
  98. }
Add Comment
Please, Sign In to add comment