Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- while (true) {
- Duration d = Duration.ofMillis(1000);
- ConsumerRecords<String, String> records = listenConsumer.poll(d);
- l.consume(records, stopwords);
- }
- private void consume(ConsumerRecords<String, String> records, List<String> stopwords) {
- for (ConsumerRecord r : records) {
- String[] tokens = ((String) r.value()).split("\\s");
- HashSet<String> keys = new HashSet<>();
- for (String t : tokens) {
- if (!stopwords.contains(t.toLowerCase())) {
- String key = (String) r.key();
- //TODO: change to pipeline commands
- j.zincrby(key, 1.0, t);
- j.zincrby("global-token-count", 1.0, t);
- keys.add(key);
- }
- }
- if(keys.size()>0) {
- System.out.println(keys);
- String[] a = new String[keys.size()];
- j.sadd("movies-list", keys.toArray(a));
- }
- }
- }
Add Comment
Please, Sign In to add comment