Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Created by fwang1 on 3/25/15.
- */
- module.exports = function(RED) {
- /*
- * Kafka Producer
- * Parameters:
- - topics
- - zkquorum(example: zkquorum = “[host]:2181")
- */
- function kafkaNodeK(config) {
- RED.nodes.createNode(this,config);
- var topic = config.topic;
- var clusterZookeeper = config.zkquorum;
- var key = config.key;
- var debug = (config.debug == "debug");
- var node = this;
- var kafka = require('kafka-node');
- var HighLevelProducer = kafka.HighLevelProducer;
- var Client = kafka.Client;
- var topics = config.topics;
- var client = new Client(clusterZookeeper);
- try {
- this.on("input", function(msg) {
- var payloads = [];
- // check if multiple topics
- if (topics.indexOf(",") > -1){
- var topicArry = topics.split(',');
- for (i = 0; i < topicArry.length; i++) {
- payloads.push({topic: topicArry[i], messages: msg.payload});
- }
- }
- else {
- KeyedMessage = kafka.KeyedMessage;
- //json = JSON.parse(msg.payload)
- //km = new KeyedMessage(key,json);
- km = new KeyedMessage(key,msg.payload);
- payloads = [{topic: topics, messages: km}];
- }
- producer.send(payloads, function(err, data){
- if (err){
- node.error(err);
- }
- node.log("Message Sent: " + data);
- });
- });
- }
- catch(e) {
- node.error(e);
- }
- var producer = new HighLevelProducer(client);
- this.status({fill:"green",shape:"dot",text:"connected to "+clusterZookeeper});
- }
- RED.nodes.registerType("kafkaKey",kafkaNodeK);
- };
Advertisement
Add Comment
Please, Sign In to add comment