- // ZeroMQ PUB/SUB + Multicast (PGM) enabled event emitter for node.js
- // gleicon - 2011
- var util = require("util");
- var events = require("events");
- var zeromq = require("zeromq"); // check the right path
- var sys = require("sys");
- var address = "epgm://192.168.2.7:9999";
- function DistEventEmitter(name, remote_node) {
- events.EventEmitter.call(this);
- this.id = new Date().getTime() + "-" + process.pid;
- this.pub_socket = zeromq.createSocket('pub');
- this.sub_socket = zeromq.createSocket('sub');
- this.pub_socket.connect(address, function(e){ if (e) console.log(e); });
- this.sub_socket.connect(address, function(e){ if (e) console.log(e); });
- this.sub_socket.subscribe("EVENTS");
- this.sub_socket.on("message", DistEventEmitter.prototype._dispatch);
- }
- util.inherits(DistEventEmitter, events.EventEmitter);
- DistEventEmitter.prototype._emit_local = DistEventEmitter.prototype.emit
- DistEventEmitter.prototype._dispatch = function (data) {
- DistEventEmitter.call(this);
- try {
- d = JSON.parse(data.toString('utf8').replace('EVENTS ', ''));
- console.log('r: '+d.msg)
- if (d.id != this.id) DistEventEmitter.prototype._emit_local(d.event_name, d.msg);
- console.log('rec: '+d.msg);
- } catch (e) { console.log(e); }
- }
- DistEventEmitter.prototype._broadcast = function(w) {
- DistEventEmitter.call(this);
- pkt = JSON.stringify({ "id": this.id,
- "event_emitter": "data_ws",
- "event_name" : "data",
- "msg": w });
- this.pub_socket.send("EVENTS " + pkt);
- }
- DistEventEmitter.prototype.emit = function(evt, data) {
- this._broadcast(data)
- return this._emit_local(evt, data);
- }
- // test
- var de = new DistEventEmitter();
- de.on("data", function(data) { console.log('Received data: "' + data + '"'); })
- de.on("error", function(data) { console.log('Received data (Error): "' + data + '"'); })
- de.emit("data", "It works!")
- de.emit("data", "It works 2")