Advertisement
Guest User

Untitled

a guest
Jun 11th, 2017
106
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.19 KB | None | 0 0
  1. var mqtt = require("mqtt");
  2. var util = require("util");
  3. var isUtf8 = require('is-utf8');
  4.  
  5. function matchTopic(ts, t) {
  6. if (ts == "#") {
  7. return true;
  8. }
  9. var re = new RegExp("^" + ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g, "\\$1").replace(/\+/g, "[^/]+").replace(/\/#$/, "(\/.*)?") + "$");
  10. return re.test(t);
  11. }
  12.  
  13. function MQTTBroker() {
  14. // Configuration options passed by Node Red
  15. this.broker = "localhost";
  16. this.port = "1883";
  17. this.clientid = "";
  18. this.usetls = false;
  19. this.verifyservercert = undefined;
  20. this.compatmode = true;
  21. this.keepalive = "60";
  22. this.cleansession = "true";
  23.  
  24. this.birthTopic = null;
  25. this.birthPayload = "";
  26. this.birthQos = 0;
  27. this.birthRetain = "true";
  28.  
  29. this.willTopic = null;
  30. this.willPayload = "";
  31. this.willQos = 0;
  32. this.willRetain = "true";
  33.  
  34. // Config broker state
  35. this.brokerurl = "";
  36. this.connected = false;
  37. this.connecting = false;
  38. this.closing = false;
  39. this.options = {};
  40. this.queue = [];
  41. this.subscriptions = {};
  42.  
  43. if (this.birthTopic) {
  44. this.birthMessage = {
  45. topic: this.birthTopic,
  46. payload: this.birthPayload || "",
  47. qos: Number(this.birthQos || 0),
  48. retain: this.birthRetain === true
  49. };
  50. }
  51.  
  52. if (this.credentials) {
  53. this.username = this.credentials.user;
  54. this.password = this.credentials.password;
  55. }
  56.  
  57. // If the config broker is missing certain options (it was probably deployed prior to an update to the broker code),
  58. // select/generate sensible options for the new fields
  59. if (typeof this.usetls === 'undefined') {
  60. this.usetls = false;
  61. }
  62. if (typeof this.compatmode === 'undefined') {
  63. this.compatmode = true;
  64. }
  65. if (typeof this.verifyservercert === 'undefined') {
  66. this.verifyservercert = false;
  67. }
  68. if (typeof this.keepalive === 'undefined') {
  69. this.keepalive = 60;
  70. } else if (typeof this.keepalive === 'string') {
  71. this.keepalive = Number(this.keepalive);
  72. }
  73. if (typeof this.cleansession === 'undefined') {
  74. this.cleansession = true;
  75. }
  76.  
  77. // Create the URL to pass in to the MQTT.js library
  78. if (this.brokerurl === "") {
  79. if (this.usetls) {
  80. this.brokerurl = "mqtts://";
  81. } else {
  82. this.brokerurl = "mqtt://";
  83. }
  84. if (this.broker !== "") {
  85. this.brokerurl = this.brokerurl + this.broker + ":" + this.port;
  86. } else {
  87. this.brokerurl = this.brokerurl + "localhost:1883";
  88. }
  89. }
  90.  
  91. // Build options for passing to the MQTT.js API
  92. this.options.clientId = this.clientid || 'mqtt_' + (1 + Math.random() * 4294967295).toString(16);
  93. this.options.username = this.username;
  94. this.options.password = this.password;
  95. this.options.keepalive = this.keepalive;
  96. this.options.clean = this.cleansession;
  97. this.options.reconnectPeriod = this.reconnectPeriod || 5000; //RED Settings
  98.  
  99. if (this.compatmode === true) {
  100. this.options.protocolId = 'MQIsdp';
  101. this.options.protocolVersion = 3;
  102. }
  103.  
  104. if (this.willTopic) {
  105. this.options.will = {
  106. topic: this.willTopic,
  107. payload: this.willPayload || "",
  108. qos: Number(this.willQos || 0),
  109. retain: this.willRetain == "true" || n.willRetain === true
  110. };
  111. }
  112.  
  113. // Define functions called by MQTT in and out brokers
  114. var broker = this;
  115. this.users = {};
  116.  
  117. this.connect = function () {
  118. if (!broker.connected && !broker.connecting) {
  119. broker.connecting = true;
  120. broker.client = mqtt.connect(broker.brokerurl, broker.options);
  121. broker.client.setMaxListeners(0);
  122. // Register successful connect or reconnect handler
  123. broker.client.on('connect', function () {
  124. broker.connecting = false;
  125. broker.connected = true;
  126. console.log(broker.connected);
  127.  
  128. // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
  129. broker.client.removeAllListeners('message');
  130.  
  131. // Re-subscribe to stored topics
  132. for (var s in broker.subscriptions) {
  133. if (broker.subscriptions.hasOwnProperty(s)) {
  134. var topic = s;
  135. var qos = 0;
  136. for (var r in broker.subscriptions[s]) {
  137. if (broker.subscriptions[s].hasOwnProperty(r)) {
  138. qos = Math.max(qos, broker.subscriptions[s][r].qos);
  139. broker.client.on('message', broker.subscriptions[s][r].handler);
  140. }
  141. }
  142. var options = {qos: qos};
  143. broker.client.subscribe(topic, options);
  144. }
  145. }
  146.  
  147. // Send any birth message
  148. if (broker.birthMessage) {
  149. broker.publish(broker.birthMessage);
  150. }
  151. });
  152. broker.client.on("reconnect", function () {
  153. // Reconnect
  154. });
  155.  
  156.  
  157. // Register disconnect handlers
  158. broker.client.on('close', function () {
  159. if (broker.connected) {
  160. broker.connected = false;
  161. } else if (broker.connecting) {
  162. }
  163. });
  164.  
  165. // Register connect error handler
  166. broker.client.on('error', function (error) {
  167. if (broker.connecting) {
  168. broker.client.end();
  169. broker.connecting = false;
  170. }
  171. });
  172. }
  173. };
  174.  
  175. this.subscribe = function (topic, qos, callback, ref) {
  176. ref = ref || 0;
  177. broker.subscriptions[topic] = broker.subscriptions[topic] || {};
  178. var sub = {
  179. topic: topic,
  180. qos: qos,
  181. handler: function (mtopic, mpayload, mpacket) {
  182. if (matchTopic(topic, mtopic)) {
  183. callback(mtopic, mpayload, mpacket);
  184. }
  185. },
  186. ref: ref
  187. };
  188. broker.subscriptions[topic][ref] = sub;
  189. if (broker.connected) {
  190. broker.client.on('message', sub.handler);
  191. var options = {};
  192. options.qos = qos;
  193. broker.client.subscribe(topic, options);
  194. }
  195. };
  196.  
  197. this.unsubscribe = function (topic, ref) {
  198. ref = ref || 0;
  199. var sub = broker.subscriptions[topic];
  200. if (sub) {
  201. if (sub[ref]) {
  202. broker.client.removeListener('message', sub[ref].handler);
  203. delete sub[ref];
  204. }
  205. if (Object.keys(sub).length === 0) {
  206. delete broker.subscriptions[topic];
  207. if (broker.connected) {
  208. broker.client.unsubscribe(topic);
  209. }
  210. }
  211. }
  212. };
  213.  
  214. this.publish = function (msg) {
  215. if (broker.connected) {
  216. if (!Buffer.isBuffer(msg.payload)) {
  217. if (typeof msg.payload === "object") {
  218. msg.payload = JSON.stringify(msg.payload);
  219. } else if (typeof msg.payload !== "string") {
  220. msg.payload = "" + msg.payload;
  221. }
  222. }
  223.  
  224. var options = {
  225. qos: msg.qos || 0,
  226. retain: msg.retain || false
  227. };
  228. broker.client.publish(msg.topic, msg.payload, options, function (err) {
  229. return err
  230. });
  231. }
  232. };
  233. }
  234.  
  235. topic = "chat/#";
  236. qos = 2;
  237. if (isNaN(qos) || qos < 0 || qos > 2) {
  238. qos = 2;
  239. }
  240.  
  241. brokerConn = new MQTTBroker();
  242.  
  243. if (brokerConn) {
  244. if (topic) {
  245. brokerConn.connect();
  246. brokerConn.subscribe(topic, qos, function (topic, payload, packet) {
  247. if (isUtf8(payload)) {
  248. payload = payload.toString();
  249. }
  250. var msg = {topic: topic, payload: payload, qos: packet.qos, retain: packet.retain};
  251. if ((brokerConn.broker === "localhost") || (brokerConn.broker === "127.0.0.1")) {
  252. msg._topic = topic;
  253. }
  254. console.log(msg);
  255. }, topic);
  256. }
  257. else {
  258.  
  259. }
  260. } else {
  261.  
  262. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement