Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- var mqtt = require("mqtt");
- var util = require("util");
- var isUtf8 = require('is-utf8');
- function matchTopic(ts, t) {
- if (ts == "#") {
- return true;
- }
- var re = new RegExp("^" + ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g, "\\$1").replace(/\+/g, "[^/]+").replace(/\/#$/, "(\/.*)?") + "$");
- return re.test(t);
- }
- function MQTTBroker() {
- // Configuration options passed by Node Red
- this.broker = "localhost";
- this.port = "1883";
- this.clientid = "";
- this.usetls = false;
- this.verifyservercert = undefined;
- this.compatmode = true;
- this.keepalive = "60";
- this.cleansession = "true";
- this.birthTopic = null;
- this.birthPayload = "";
- this.birthQos = 0;
- this.birthRetain = "true";
- this.willTopic = null;
- this.willPayload = "";
- this.willQos = 0;
- this.willRetain = "true";
- // Config broker state
- this.brokerurl = "";
- this.connected = false;
- this.connecting = false;
- this.closing = false;
- this.options = {};
- this.queue = [];
- this.subscriptions = {};
- if (this.birthTopic) {
- this.birthMessage = {
- topic: this.birthTopic,
- payload: this.birthPayload || "",
- qos: Number(this.birthQos || 0),
- retain: this.birthRetain === true
- };
- }
- if (this.credentials) {
- this.username = this.credentials.user;
- this.password = this.credentials.password;
- }
- // If the config broker is missing certain options (it was probably deployed prior to an update to the broker code),
- // select/generate sensible options for the new fields
- if (typeof this.usetls === 'undefined') {
- this.usetls = false;
- }
- if (typeof this.compatmode === 'undefined') {
- this.compatmode = true;
- }
- if (typeof this.verifyservercert === 'undefined') {
- this.verifyservercert = false;
- }
- if (typeof this.keepalive === 'undefined') {
- this.keepalive = 60;
- } else if (typeof this.keepalive === 'string') {
- this.keepalive = Number(this.keepalive);
- }
- if (typeof this.cleansession === 'undefined') {
- this.cleansession = true;
- }
- // Create the URL to pass in to the MQTT.js library
- if (this.brokerurl === "") {
- if (this.usetls) {
- this.brokerurl = "mqtts://";
- } else {
- this.brokerurl = "mqtt://";
- }
- if (this.broker !== "") {
- this.brokerurl = this.brokerurl + this.broker + ":" + this.port;
- } else {
- this.brokerurl = this.brokerurl + "localhost:1883";
- }
- }
- // Build options for passing to the MQTT.js API
- this.options.clientId = this.clientid || 'mqtt_' + (1 + Math.random() * 4294967295).toString(16);
- this.options.username = this.username;
- this.options.password = this.password;
- this.options.keepalive = this.keepalive;
- this.options.clean = this.cleansession;
- this.options.reconnectPeriod = this.reconnectPeriod || 5000; //RED Settings
- if (this.compatmode === true) {
- this.options.protocolId = 'MQIsdp';
- this.options.protocolVersion = 3;
- }
- if (this.willTopic) {
- this.options.will = {
- topic: this.willTopic,
- payload: this.willPayload || "",
- qos: Number(this.willQos || 0),
- retain: this.willRetain == "true" || n.willRetain === true
- };
- }
- // Define functions called by MQTT in and out brokers
- var broker = this;
- this.users = {};
- this.connect = function () {
- if (!broker.connected && !broker.connecting) {
- broker.connecting = true;
- broker.client = mqtt.connect(broker.brokerurl, broker.options);
- broker.client.setMaxListeners(0);
- // Register successful connect or reconnect handler
- broker.client.on('connect', function () {
- broker.connecting = false;
- broker.connected = true;
- console.log(broker.connected);
- // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
- broker.client.removeAllListeners('message');
- // Re-subscribe to stored topics
- for (var s in broker.subscriptions) {
- if (broker.subscriptions.hasOwnProperty(s)) {
- var topic = s;
- var qos = 0;
- for (var r in broker.subscriptions[s]) {
- if (broker.subscriptions[s].hasOwnProperty(r)) {
- qos = Math.max(qos, broker.subscriptions[s][r].qos);
- broker.client.on('message', broker.subscriptions[s][r].handler);
- }
- }
- var options = {qos: qos};
- broker.client.subscribe(topic, options);
- }
- }
- // Send any birth message
- if (broker.birthMessage) {
- broker.publish(broker.birthMessage);
- }
- });
- broker.client.on("reconnect", function () {
- // Reconnect
- });
- // Register disconnect handlers
- broker.client.on('close', function () {
- if (broker.connected) {
- broker.connected = false;
- } else if (broker.connecting) {
- }
- });
- // Register connect error handler
- broker.client.on('error', function (error) {
- if (broker.connecting) {
- broker.client.end();
- broker.connecting = false;
- }
- });
- }
- };
- this.subscribe = function (topic, qos, callback, ref) {
- ref = ref || 0;
- broker.subscriptions[topic] = broker.subscriptions[topic] || {};
- var sub = {
- topic: topic,
- qos: qos,
- handler: function (mtopic, mpayload, mpacket) {
- if (matchTopic(topic, mtopic)) {
- callback(mtopic, mpayload, mpacket);
- }
- },
- ref: ref
- };
- broker.subscriptions[topic][ref] = sub;
- if (broker.connected) {
- broker.client.on('message', sub.handler);
- var options = {};
- options.qos = qos;
- broker.client.subscribe(topic, options);
- }
- };
- this.unsubscribe = function (topic, ref) {
- ref = ref || 0;
- var sub = broker.subscriptions[topic];
- if (sub) {
- if (sub[ref]) {
- broker.client.removeListener('message', sub[ref].handler);
- delete sub[ref];
- }
- if (Object.keys(sub).length === 0) {
- delete broker.subscriptions[topic];
- if (broker.connected) {
- broker.client.unsubscribe(topic);
- }
- }
- }
- };
- this.publish = function (msg) {
- if (broker.connected) {
- if (!Buffer.isBuffer(msg.payload)) {
- if (typeof msg.payload === "object") {
- msg.payload = JSON.stringify(msg.payload);
- } else if (typeof msg.payload !== "string") {
- msg.payload = "" + msg.payload;
- }
- }
- var options = {
- qos: msg.qos || 0,
- retain: msg.retain || false
- };
- broker.client.publish(msg.topic, msg.payload, options, function (err) {
- return err
- });
- }
- };
- }
- topic = "chat/#";
- qos = 2;
- if (isNaN(qos) || qos < 0 || qos > 2) {
- qos = 2;
- }
- brokerConn = new MQTTBroker();
- if (brokerConn) {
- if (topic) {
- brokerConn.connect();
- brokerConn.subscribe(topic, qos, function (topic, payload, packet) {
- if (isUtf8(payload)) {
- payload = payload.toString();
- }
- var msg = {topic: topic, payload: payload, qos: packet.qos, retain: packet.retain};
- if ((brokerConn.broker === "localhost") || (brokerConn.broker === "127.0.0.1")) {
- msg._topic = topic;
- }
- console.log(msg);
- }, topic);
- }
- else {
- }
- } else {
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement