Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- var net = require('net'),
- sys = require('sys'),
- util = require('util'),
- events = require('events');
- function trim (str) {
- var str = str.replace(/^\s\s*/, ''),
- ws = /\s/,
- i = str.length;
- while (ws.test(str.charAt(--i)));
- return str.slice(0, i + 1);
- }
- // Parser
- Parser = function() {
- events.EventEmitter(this);
- this.buffer = '';
- this.on('frame', function (data) {
- //console.log("\n\nFRAME\n" + data + "\n\n\n");
- var parts = data.split('\n', -1);
- var command = parts.splice(0, 1);
- var body = parts.splice(-1, 1);
- var headers = new Array();
- for (var i = 0; i < parts.length; i++) {
- var kv = parts[i].split(':', 2);
- if (kv.length == 2)
- headers[trim(kv[0])] = trim(kv[1]);
- }
- //console.log(headers);
- this.emit(command, headers, body);
- });
- };
- sys.inherits(Parser, events.EventEmitter);
- Parser.prototype.appendString = function (str) {
- this.buffer += str;
- this.parse();
- };
- Parser.prototype.parse = function () {
- var end = this.buffer.indexOf("\x00");
- if (end != -1) {
- var consume = this.buffer.substr(0, end),
- rest = this.buffer.substr(end + 1, (this.buffer.length - consume.length));
- this.buffer = rest;
- this.emit('frame', consume);
- this.parse();
- }
- };
- // Protocol
- Protocol = function(stream) {
- events.EventEmitter(this);
- this.stream = stream;
- }
- sys.inherits(Protocol, events.EventEmitter);
- Protocol.prototype.receipt = function(id) {
- if (id && this.stream.writable)
- this.stream.write('RECEIPT\nreceipt-id:'+id+'\n\n\x00');
- }
- Subscription = function() {
- this.queue = '';
- this.ack = false;
- };
- Client = function() {
- this.ip = '';
- this.port = '';
- this.login= '';
- this.password = '';
- this.session_id = '';
- this.subscription = new Array();
- };
- function Broker () {
- events.EventEmitter.call(this);
- var stream = arguments[0];
- var func = arguments[1];
- var parser = new Parser();
- var proto = new Protocol(stream);
- var client = new Client();
- stream.setEncoding('ascii');
- stream.on('connect', function(data) {
- });
- stream.on('data', function(data) {
- parser.appendString(data);
- });
- parser.on('CONNECT', function(headers, body) {
- stream.write('CONNECTED\nsession: 12345\n\n\x00'); // XXX unique session id
- });
- parser.on('SEND', function(headers, body) {
- proto.receipt(headers['receipt']);
- proto.emit('send', headers['destination'], body);
- });
- parser.on('SUBSCRIBE', function(headers, body) {
- proto.receipt(headers['receipt']);
- proto.emit('subscribe', headers['destination']);
- });
- parser.on('UNSUBSCRIBE', function(headers, body) {
- proto.emit('unsubscribe', headers['destination']);
- });
- func.call(this, proto);
- };
- util.inherits(Broker, events.EventEmitter);
- exports.Broker = Broker;
- exports.createBroker = function() {
- return new Broker(arguments[0], arguments[1]);
- };
Add Comment
Please, Sign In to add comment