Guest User

Untitled

a guest
Dec 16th, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.97 KB | None | 0 0
  1. while (true) {
  2. Duration d = Duration.ofMillis(1000);
  3. ConsumerRecords<String, String> records = listenConsumer.poll(d);
  4. l.consume(records, stopwords);
  5. }
  6.  
  7. private void consume(ConsumerRecords<String, String> records, List<String> stopwords) {
  8. for (ConsumerRecord r : records) {
  9. String[] tokens = ((String) r.value()).split("\\s");
  10. HashSet<String> keys = new HashSet<>();
  11. for (String t : tokens) {
  12. if (!stopwords.contains(t.toLowerCase())) {
  13. String key = (String) r.key();
  14. //TODO: change to pipeline commands
  15. j.zincrby(key, 1.0, t);
  16. j.zincrby("global-token-count", 1.0, t);
  17. keys.add(key);
  18. }
  19. }
  20. if(keys.size()>0) {
  21. System.out.println(keys);
  22. String[] a = new String[keys.size()];
  23. j.sadd("movies-list", keys.toArray(a));
  24. }
  25.  
  26. }
  27. }
Add Comment
Please, Sign In to add comment