Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- var kafka = require("kafka-node");
- var Consumer = kafka.Consumer;
- var Client = kafka.Client;
- var client = new Client('localhost:2181');
- var topics = [
- {topic: "benchmark", partition: 0}
- ];
- var options = { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 2048 * 2048, encoding: 'utf8' };
- var consumer = new Consumer(client, topics, options);
- var arr = [];
- var ctr = 0;
- var MongoClient = require('mongodb').MongoClient;
- consumer.on('message', function (message) {
- //var entry = [message.topic, message.key.toString(), message.value];
- arr.push(message);
- ctr++;
- // wait until 10 new messages are consumed
- if (ctr == 10) {
- storeDataOnMongo(arr);
- // reset ctr and array
- ctr = 0;
- arr = [];
- }
- });
- function storeDataOnMongo(data) {
- // Connect to the db
- MongoClient.connect("mongodb://localhost:27017/benchmark", function (err, db) {
- if(err) throw err;
- for (var i = 0; i < data.length; i++) {
- db.collection(data[i].topic, function (err, collection) {
- "index.js" 50L, 1093C
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement