const User = require('./models/user'); const UserPreference = require('./models/user-preference'); const Device = require('./models/device'); const _ = require("lodash"); const ObjectId = require('mongoose').Types.ObjectId; const handle = require('./handle'); // Accepts the connection if the username and password are valid var authenticate = async function(client, username, password, callback) { let errorAuth = new Error() errorAuth.returnCode = 4; console.log("authorize :-", username); if (!username) return callback(errorAuth); let userSplit = username.split("-"); if (userSplit.length != 2) return callback(errorAuth); var userType = userSplit[0]; username = userSplit[1]; new Promise((resolve) => { if (userType == 'user') { User.findById(username, (err, user) => { if (err) return resolve(false); resolve(password.toString() === _.get(user, 'mqttPassword')); }); } else if (userType == 'device') { Device.findById(username, (err, device) => { if (err || !device) return resolve(false); let result = device.enabled == true && password.toString() === _.get(device, 'mqttPassword'); if (!result) console.error("not authorized, device id = ", device.id); resolve(result); }); } }).then((authorized) => { if (authorized) { client.user = username; client.userType = userType; callback(null, authorized); } else { callback(errorAuth, null); } }) } // In this case the client authorized as alice can publish to /users/alice taking // the username from the topic and verifing it is the same of the authorized user var authorizePublish = async function(client, packet, callback) { // return callback(null, true); let err = await validateRequest(client, packet) if (err) { console.error("Publish not authorized"); } callback(err); // callback(null); deviceToggled(client, packet.topic, packet.payload); } // In this case the client authorized as alice can subscribe to /users/alice taking // the username from the topic and verifing it is the same of the authorized user var authorizeSubscribe = async function(client, sub, callback) { // callback(null , sub); let err = await validateRequest(client, sub); if (err) { console.error("Subscribe not authorized"); } callback(err, sub); // callback(null, sub); } async function validateRequest(client, packet) { var error = new Error('Auth error') error.returnCode = 4; let topicSplit = packet.topic.split('/'); if (topicSplit.length < 2) return error; let deviceId = topicSplit[1]; if (!deviceId || !ObjectId.isValid(deviceId)) return error; if (client.userType == 'user') { if (client.user == "5b46d2839dc37000a4931a35") return null; // mohit let users = await User.find({_id: new ObjectId(client.user), devices: deviceId}) return _.isEmpty(users) ? error : null; } else if (client.userType == 'device') { if (client.user == deviceId) return null; let devices = await Device.find({primaryDeviceId: new ObjectId(client.user), _id: new ObjectId(deviceId)}) return _.isEmpty(devices)? error : null; } } const mongoPersistence = require('aedes-persistence-mongodb'); var mq_mongodb = require('mqemitter-mongodb'); var mq = mq_mongodb({ url: process.env.MONGO_MQTT_URL }) global.mqEmitter = mq; var settings = { mq: mq, persistence: mongoPersistence({ url: process.env.MONGO_MQTT_URL, ttl: { packets: { incoming: 100, outgoing: 100, will: 300, retained: -1 }, // Number of seconds subscriptions: 1, }, dropExistingIndexes: true }) }; var stats = require('aedes-stats'); const aedes = require('aedes')(settings); stats(aedes , {interval : 60 * 1000 }); const server = require('net').createServer(aedes.handle); const httpServer = require('http').createServer(); const ws = require('websocket-stream'); const httpPort = process.env.MQTTWS_PORT; const port = process.env.MQTT_PORT; server.listen(port, function () { console.log('server started and listening on port ', port) }) aedes.authenticate = authenticate; aedes.authorizePublish = authorizePublish; aedes.authorizeSubscribe = authorizeSubscribe; aedes.on('client', function(client) { console.log('client connected', client.id); }); aedes.on('publish', function(packet, client) { let payload = _.get(packet, 'payload'); if (payload) payload = payload.toString('utf8'); console.log('Published', packet.topic, payload, `by ${process.pid}`); MqttListener.published(packet.topic, payload, client); ActivityLog.recordActivity(client, packet); }); ws.createServer({ server: httpServer }, aedes.handle); httpServer.listen(httpPort, function () { console.log('aedes websocket server listening on port ', httpPort); })