Advertisement
Guest User

Untitled

a guest
Mar 28th, 2017
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1.  
  2. var kafka = require("kafka-node");
  3. var Consumer = kafka.Consumer;
  4. var Client = kafka.Client;
  5.  
  6. var client = new Client('localhost:2181');
  7. var topics = [
  8.     {topic: "benchmark", partition: 0}
  9. ];
  10. var options = { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 2048 * 2048, encoding: 'utf8' };
  11.  
  12. var consumer = new Consumer(client, topics, options);
  13.  
  14. var arr = [];
  15. var ctr = 0;
  16.  
  17. var MongoClient = require('mongodb').MongoClient;
  18.  
  19. consumer.on('message', function (message) {
  20.         //var entry = [message.topic, message.key.toString(), message.value];
  21.         arr.push(message);
  22.         ctr++;
  23.         // wait until 10 new messages are consumed
  24.         if (ctr == 10) {
  25.                 storeDataOnMongo(arr);
  26.                 // reset ctr and array
  27.                 ctr = 0;
  28.                 arr = [];
  29.         }
  30. });
  31.  
  32.  
  33. function storeDataOnMongo(data) {
  34.  
  35.         // Connect to the db
  36.         MongoClient.connect("mongodb://localhost:27017/benchmark", function (err, db) {
  37.  
  38.                 if(err) throw err;
  39.  
  40.                 for (var i = 0; i < data.length; i++) {
  41.  
  42.                         db.collection(data[i].topic, function (err, collection) {
  43.  
  44. "index.js" 50L, 1093C
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement