Guest User

Untitled

a guest
Jul 17th, 2018
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.83 KB | None | 0 0
  1. var net = require('net'),
  2. sys = require('sys'),
  3. util = require('util'),
  4. events = require('events');
  5.  
  6. function trim (str) {
  7. var str = str.replace(/^\s\s*/, ''),
  8. ws = /\s/,
  9. i = str.length;
  10. while (ws.test(str.charAt(--i)));
  11. return str.slice(0, i + 1);
  12. }
  13.  
  14. // Parser
  15.  
  16. Parser = function() {
  17. events.EventEmitter(this);
  18.  
  19. this.buffer = '';
  20.  
  21. this.on('frame', function (data) {
  22. //console.log("\n\nFRAME\n" + data + "\n\n\n");
  23. var parts = data.split('\n', -1);
  24. var command = parts.splice(0, 1);
  25. var body = parts.splice(-1, 1);
  26. var headers = new Array();
  27. for (var i = 0; i < parts.length; i++) {
  28. var kv = parts[i].split(':', 2);
  29. if (kv.length == 2)
  30. headers[trim(kv[0])] = trim(kv[1]);
  31. }
  32. //console.log(headers);
  33. this.emit(command, headers, body);
  34. });
  35. };
  36.  
  37. sys.inherits(Parser, events.EventEmitter);
  38.  
  39. Parser.prototype.appendString = function (str) {
  40. this.buffer += str;
  41. this.parse();
  42. };
  43.  
  44. Parser.prototype.parse = function () {
  45. var end = this.buffer.indexOf("\x00");
  46.  
  47. if (end != -1) {
  48. var consume = this.buffer.substr(0, end),
  49. rest = this.buffer.substr(end + 1, (this.buffer.length - consume.length));
  50.  
  51. this.buffer = rest;
  52. this.emit('frame', consume);
  53. this.parse();
  54. }
  55. };
  56.  
  57. // Protocol
  58.  
  59. Protocol = function(stream) {
  60. events.EventEmitter(this);
  61. this.stream = stream;
  62. }
  63. sys.inherits(Protocol, events.EventEmitter);
  64.  
  65. Protocol.prototype.receipt = function(id) {
  66. if (id && this.stream.writable)
  67. this.stream.write('RECEIPT\nreceipt-id:'+id+'\n\n\x00');
  68. }
  69.  
  70.  
  71. Subscription = function() {
  72. this.queue = '';
  73. this.ack = false;
  74. };
  75.  
  76. Client = function() {
  77. this.ip = '';
  78. this.port = '';
  79. this.login= '';
  80. this.password = '';
  81. this.session_id = '';
  82. this.subscription = new Array();
  83. };
  84.  
  85. function Broker () {
  86. events.EventEmitter.call(this);
  87.  
  88. var stream = arguments[0];
  89. var func = arguments[1];
  90. var parser = new Parser();
  91. var proto = new Protocol(stream);
  92. var client = new Client();
  93.  
  94. stream.setEncoding('ascii');
  95.  
  96. stream.on('connect', function(data) {
  97. });
  98.  
  99. stream.on('data', function(data) {
  100. parser.appendString(data);
  101. });
  102.  
  103. parser.on('CONNECT', function(headers, body) {
  104. stream.write('CONNECTED\nsession: 12345\n\n\x00'); // XXX unique session id
  105. });
  106.  
  107. parser.on('SEND', function(headers, body) {
  108. proto.receipt(headers['receipt']);
  109. proto.emit('send', headers['destination'], body);
  110. });
  111.  
  112. parser.on('SUBSCRIBE', function(headers, body) {
  113. proto.receipt(headers['receipt']);
  114. proto.emit('subscribe', headers['destination']);
  115. });
  116.  
  117. parser.on('UNSUBSCRIBE', function(headers, body) {
  118. proto.emit('unsubscribe', headers['destination']);
  119. });
  120.  
  121. func.call(this, proto);
  122. };
  123. util.inherits(Broker, events.EventEmitter);
  124. exports.Broker = Broker;
  125.  
  126. exports.createBroker = function() {
  127. return new Broker(arguments[0], arguments[1]);
  128. };
Add Comment
Please, Sign In to add comment