Guest User

Untitled

a guest
Oct 23rd, 2017
125
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /**
  2.  * Created by fwang1 on 3/25/15.
  3.  */
  4. module.exports = function(RED) {
  5.     /*
  6.      *   Kafka Producer
  7.      *   Parameters:
  8.      - topics
  9.      - zkquorum(example: zkquorum = “[host]:2181")
  10.      */
  11.     function kafkaNodeK(config) {
  12.         RED.nodes.createNode(this,config);
  13.         var topic = config.topic;
  14.         var clusterZookeeper = config.zkquorum;
  15.         var key = config.key;
  16.         var debug = (config.debug == "debug");
  17.         var node = this;
  18.         var kafka = require('kafka-node');
  19.         var HighLevelProducer = kafka.HighLevelProducer;
  20.         var Client = kafka.Client;
  21.         var topics = config.topics;
  22.         var client = new Client(clusterZookeeper);
  23.  
  24.         try {
  25.             this.on("input", function(msg) {
  26.                 var payloads = [];
  27.  
  28.                 // check if multiple topics
  29.                 if (topics.indexOf(",") > -1){
  30.                     var topicArry = topics.split(',');
  31.  
  32.                     for (i = 0; i < topicArry.length; i++) {
  33.                         payloads.push({topic: topicArry[i], messages: msg.payload});
  34.                     }
  35.                 }
  36.                 else {
  37.                     KeyedMessage = kafka.KeyedMessage;
  38.             //json = JSON.parse(msg.payload)
  39.                     //km = new KeyedMessage(key,json);
  40.             km = new KeyedMessage(key,msg.payload);
  41.                     payloads = [{topic: topics, messages: km}];
  42.                 }
  43.  
  44.                 producer.send(payloads, function(err, data){
  45.                     if (err){
  46.                         node.error(err);
  47.                     }
  48.                     node.log("Message Sent: " + data);
  49.                 });
  50.             });
  51.         }
  52.         catch(e) {
  53.             node.error(e);
  54.         }
  55.         var producer = new HighLevelProducer(client);
  56.         this.status({fill:"green",shape:"dot",text:"connected to "+clusterZookeeper});
  57.     }
  58.  
  59.     RED.nodes.registerType("kafkaKey",kafkaNodeK);
  60.  
  61. };
Advertisement
Add Comment
Please, Sign In to add comment