Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package de.hska.iwi.vsys.bdelab.streaming;
- import org.apache.storm.topology.BasicOutputCollector;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.tuple.Fields;
- import org.apache.storm.tuple.Tuple;
- import org.apache.storm.tuple.Values;
- import java.util.HashMap;
- import java.util.Map;
- /*
- START SERVICES
- /usr/local/opt/kafka_2.11-1.0.0/bin/zookeeper-server-start.sh /usr/local/opt/kafka_2.11-1.0.0/config/zookeeper.properties
- /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-server-start.sh /usr/local/opt/kafka_2.11-1.0.0/config/server.properties
- CREATE TOPIC
- /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sentence_yuar1011
- CHECK IT*S LIVE
- /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
- SEND MESSAGES
- /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sentence_yuar1011
- RECEIVE MESSAGES
- /usr/local/opt/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sentence_yuar1011 --from-beginning
- */
- public class WordCountBolt extends NoisyBolt {
- private Map<String, Integer> counts = new HashMap<>();
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- System.out.println(getIDs() + " executes tuple: " + tuple);
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if (count == null) {
- count = 0;
- }
- count++;
- counts.put(word, count);
- Values values = new Values(word, count);
- System.out.println(getIDs() + " result values: " + values);
- collector.emit(values);
- /*
- ADD bolt for trimming your urls
- String url = tuple.getString(0);
- String time = tuple.getString(1);
- url = trimUrl(url);
- collector.emit(new Values(url, time));
- public static String trimUrl(String url) {
- return url.split("/")[2];
- }
- ADD bolt for bucketing your epoch-times
- String url = tuple.getString(0);
- long time = Long.parseLong( tuple.getString(1), 10);
- String timeBucket = convertToHour(time);
- collector.emit(new Values(url, timeBucket));
- public static String convertToHour(long timestamp) {
- long timestampInMs = timestamp * 1000;
- Date date = new Date(timestampInMs);
- Format format = new SimpleDateFormat("dd.MM.yyyy/HH");
- return format.format(date);
- }
- THIS bolt for counting
- String url = tuple.getString(0);
- String time = tuple.getString(1);
- String compositeKey = url + " " + time;
- Integer count = counts.get(compositeKey);
- if (count == null) {
- count = 0;
- }
- count++;
- counts.put(compositeKey, count);
- Values values = new Values(compositeKey, count);
- System.out.println(getIDs() + " result values: " + values);
- collector.emit(values);
- */
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
Add Comment
Please, Sign In to add comment