Maxouille

socket.js

Jul 4th, 2019
155
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 17.96 KB | None | 0 0
  1. /**
  2. * Connections
  3. * Pokemon Showdown - http://pokemonshowdown.com/
  4. *
  5. * Abstraction layer for multi-process SockJS connections.
  6. *
  7. * This file handles all the communications between the users'
  8. * browsers, the networking processes, and users.js in the
  9. * main process.
  10. *
  11. * @license MIT license
  12. */
  13.  
  14. 'use strict';
  15.  
  16. const cluster = require('cluster');
  17. const fs = require('fs');
  18.  
  19. if (cluster.isMaster) {
  20. cluster.setupMaster({
  21. exec: require('path').resolve(__dirname, 'sockets'),
  22. });
  23.  
  24. const workers = exports.workers = new Map();
  25.  
  26. const spawnWorker = exports.spawnWorker = function () {
  27. let worker = cluster.fork({PSPORT: Config.port, PSBINDADDR: Config.bindaddress || '0.0.0.0', PSNOSSL: Config.ssl ? 0 : 1});
  28. let id = worker.id;
  29. workers.set(id, worker);
  30. worker.on('message', data => {
  31. // console.log('master received: ' + data);
  32. switch (data.charAt(0)) {
  33. case '*': {
  34. // *socketid, ip, protocol
  35. // connect
  36. let nlPos = data.indexOf('\n');
  37. let nlPos2 = data.indexOf('\n', nlPos + 1);
  38. Users.socketConnect(worker, id, data.slice(1, nlPos), data.slice(nlPos + 1, nlPos2), data.slice(nlPos2 + 1));
  39. break;
  40. }
  41.  
  42. case '!': {
  43. // !socketid
  44. // disconnect
  45. Users.socketDisconnect(worker, id, data.substr(1));
  46. break;
  47. }
  48.  
  49. case '<': {
  50. // <socketid, message
  51. // message
  52. let nlPos = data.indexOf('\n');
  53. Users.socketReceive(worker, id, data.substr(1, nlPos - 1), data.substr(nlPos + 1));
  54. break;
  55. }
  56.  
  57. default:
  58. // unhandled
  59. }
  60. });
  61.  
  62. return worker;
  63. };
  64.  
  65. cluster.on('exit', (worker, code, signal) => {
  66. if (code === null && signal !== null) {
  67. // Worker was killed by Sockets.killWorker or Sockets.killPid, probably.
  68. console.log(`Worker ${worker.id} was forcibly killed with status ${signal}.`);
  69. workers.delete(worker.id);
  70. } else if (code === 0 && signal === null) {
  71. // Happens when killing PS with ^C
  72. console.log(`Worker ${worker.id} died, but returned a successful exit code.`);
  73. workers.delete(worker.id);
  74. } else if (code > 0) {
  75. // Worker was killed abnormally, likely because of a crash.
  76. require('./lib/crashlogger')(new Error(`Worker ${worker.id} abruptly died with code ${code} and signal ${signal}`), "The main process");
  77. // Don't delete the worker so it can be inspected if need be.
  78. }
  79.  
  80. if (worker.isConnected()) worker.disconnect();
  81. // FIXME: this is a bad hack to get around a race condition in
  82. // Connection#onDisconnect sending room deinit messages after already
  83. // having removed the sockets from their channels.
  84. worker.send = () => {};
  85.  
  86. let count = 0;
  87. Users.connections.forEach(connection => {
  88. if (connection.worker === worker) {
  89. Users.socketDisconnect(worker, worker.id, connection.socketid);
  90. count++;
  91. }
  92. });
  93. console.log(`${count} connections were lost.`);
  94.  
  95. // Attempt to recover.
  96. spawnWorker();
  97. });
  98.  
  99. exports.listen = function (port, bindAddress, workerCount) {
  100. if (port !== undefined && !isNaN(port)) {
  101. Config.port = port;
  102. Config.ssl = null;
  103. } else {
  104. port = Config.port;
  105.  
  106. // Autoconfigure when running in cloud environments.
  107. try {
  108. const cloudenv = require('cloud-env');
  109. bindAddress = cloudenv.get('IP', bindAddress);
  110. port = cloudenv.get('PORT', port);
  111. } catch (e) {}
  112. }
  113. if (bindAddress !== undefined) {
  114. Config.bindaddress = bindAddress;
  115. }
  116. if (workerCount === undefined) {
  117. workerCount = (Config.workers !== undefined ? Config.workers : 1);
  118. }
  119. for (let i = 0; i < workerCount; i++) {
  120. spawnWorker();
  121. }
  122. };
  123.  
  124. exports.killWorker = function (worker) {
  125. let count = 0;
  126. Users.connections.forEach(connection => {
  127. if (connection.worker === worker) {
  128. Users.socketDisconnect(worker, worker.id, connection.socketid);
  129. count++;
  130. }
  131. });
  132. console.log(`${count} connections were lost.`);
  133.  
  134. try {
  135. worker.kill('SIGTERM');
  136. } catch (e) {}
  137.  
  138. return count;
  139. };
  140.  
  141. exports.killPid = function (pid) {
  142. for (const worker of workers.values()) {
  143. if (pid === worker.process.pid) {
  144. return this.killWorker(worker);
  145. }
  146. }
  147. return false;
  148. };
  149.  
  150. exports.socketSend = function (worker, socketid, message) {
  151. worker.send(`>${socketid}\n${message}`);
  152. };
  153. exports.socketDisconnect = function (worker, socketid) {
  154. worker.send(`!${socketid}`);
  155. };
  156.  
  157. exports.channelBroadcast = function (channelid, message) {
  158. workers.forEach(worker => {
  159. worker.send(`#${channelid}\n${message}`);
  160. });
  161. };
  162. exports.channelSend = function (worker, channelid, message) {
  163. worker.send(`#${channelid}\n${message}`);
  164. };
  165. exports.channelAdd = function (worker, channelid, socketid) {
  166. worker.send(`+${channelid}\n${socketid}`);
  167. };
  168. exports.channelRemove = function (worker, channelid, socketid) {
  169. worker.send(`-${channelid}\n${socketid}`);
  170. };
  171.  
  172. exports.subchannelBroadcast = function (channelid, message) {
  173. workers.forEach(worker => {
  174. worker.send(`:${channelid}\n${message}`);
  175. });
  176. };
  177. exports.subchannelMove = function (worker, channelid, subchannelid, socketid) {
  178. worker.send(`.${channelid}\n${subchannelid}\n${socketid}`);
  179. };
  180. } else {
  181. // is worker
  182. // @ts-ignore This file doesn't exist on the repository, so Travis checks fail if this isn't ignored
  183. global.Config = require('./config/config');
  184.  
  185. if (process.env.PSPORT) Config.port = +process.env.PSPORT;
  186. if (process.env.PSBINDADDR) Config.bindaddress = process.env.PSBINDADDR;
  187. if (+process.env.PSNOSSL) Config.ssl = null;
  188.  
  189. if (Config.ofe) {
  190. try {
  191. require.resolve('node-oom-heapdump');
  192. } catch (e) {
  193. if (e.code !== 'MODULE_NOT_FOUND') throw e; // should never happen
  194. throw new Error(
  195. 'node-oom-heapdump is not installed, but it is a required dependency if Config.ofe is set to true! ' +
  196. 'Run npm install node-oom-heapdump and restart the server.'
  197. );
  198. }
  199.  
  200. // Create a heapdump if the process runs out of memory.
  201. require('node-oom-heapdump')({
  202. addTimestamp: true,
  203. });
  204. }
  205.  
  206. // Static HTTP server
  207.  
  208. // This handles the custom CSS and custom avatar features, and also
  209. // redirects yourserver:8001 to yourserver-8001.psim.us
  210.  
  211. // It's optional if you don't need these features.
  212.  
  213. global.Dnsbl = require('./dnsbl');
  214.  
  215. if (Config.crashguard) {
  216. // graceful crash
  217. process.on('uncaughtException', err => {
  218. require('./lib/crashlogger')(err, `Socket process ${cluster.worker.id} (${process.pid})`, true);
  219. });
  220. }
  221.  
  222. let app = require('http').createServer();
  223. let appssl = null;
  224. if (Config.ssl) {
  225. let key;
  226. try {
  227. key = require('path').resolve(__dirname, Config.ssl.options.key);
  228. if (!fs.lstatSync(key).isFile()) throw new Error();
  229. try {
  230. key = fs.readFileSync(key);
  231. } catch (e) {
  232. require('./lib/crashlogger')(new Error(`Failed to read the configured SSL private key PEM file:\n${e.stack}`), `Socket process ${cluster.worker.id} (${process.pid})`, true);
  233. }
  234. } catch (e) {
  235. console.warn('SSL private key config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.');
  236. key = Config.ssl.options.key;
  237. }
  238.  
  239. let cert;
  240. try {
  241. cert = require('path').resolve(__dirname, Config.ssl.options.cert);
  242. if (!fs.lstatSync(cert).isFile()) throw new Error();
  243. try {
  244. cert = fs.readFileSync(cert);
  245. } catch (e) {
  246. require('./lib/crashlogger')(new Error(`Failed to read the configured SSL certificate PEM file:\n${e.stack}`), `Socket process ${cluster.worker.id} (${process.pid})`, true);
  247. }
  248. } catch (e) {
  249. console.warn('SSL certificate config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.');
  250. cert = Config.ssl.options.cert;
  251. }
  252.  
  253. if (key && cert) {
  254. try {
  255. // In case there are additional SSL config settings besides the key and cert...
  256. appssl = require('https').createServer(Object.assign({}, Config.ssl.options, {key, cert}));
  257. } catch (e) {
  258. require('./lib/crashlogger')(`The SSL settings are misconfigured:\n${e.stack}`, `Socket process ${cluster.worker.id} (${process.pid})`, true);
  259. }
  260. }
  261. }
  262.  
  263. // Static server
  264. const StaticServer = require('node-static').Server;
  265. const roomidRegex = /^\/(?:[A-Za-z0-9][A-Za-z0-9-]*)\/?$/;
  266. const cssServer = new StaticServer('./config');
  267. const avatarServer = new StaticServer('./config/avatars');
  268. const staticServer = new StaticServer('./static');
  269. const staticRequestHandler = (req, res) => {
  270. // console.log(`static rq: ${req.socket.remoteAddress}:${req.socket.remotePort} -> ${req.socket.localAddress}:${req.socket.localPort} - ${req.method} ${req.url} ${req.httpVersion} - ${req.rawHeaders.join('|')}`);
  271. req.resume();
  272. req.addListener('end', () => {
  273. if (Config.customhttpresponse &&
  274. Config.customhttpresponse(req, res)) {
  275. return;
  276. }
  277.  
  278. let server = staticServer;
  279. if (req.url === '/custom.css') {
  280. server = cssServer;
  281. } else if (req.url.startsWith('/avatars/')) {
  282. req.url = req.url.substr(8);
  283. server = avatarServer;
  284. } else if (roomidRegex.test(req.url)) {
  285. req.url = '/';
  286. }
  287.  
  288. server.serve(req, res, e => {
  289. if (e && (e.status === 404)) {
  290. staticServer.serveFile('404.html', 404, {}, req, res);
  291. }
  292. });
  293. });
  294. };
  295.  
  296. app.on('request', staticRequestHandler);
  297. if (appssl) appssl.on('request', staticRequestHandler);
  298.  
  299. // SockJS server
  300.  
  301. // This is the main server that handles users connecting to our server
  302. // and doing things on our server.
  303.  
  304. const sockjs = require('sockjs');
  305. const options = {
  306. sockjs_url: "//play.pokemonshowdown.com/js/lib/sockjs-1.1.1-nwjsfix.min.js",
  307. prefix: '/showdown',
  308. log(severity, message) {
  309. if (severity === 'error') console.log(`ERROR: ${message}`);
  310. },
  311. };
  312.  
  313. if (Config.wsdeflate) {
  314. try {
  315. const deflate = require('permessage-deflate').configure(Config.wsdeflate);
  316. options.faye_server_options = {extensions: [deflate]};
  317. } catch (e) {
  318. require('./lib/crashlogger')(new Error("Dependency permessage-deflate is not installed or is otherwise unaccessable. No message compression will take place until server restart."), "Sockets");
  319. }
  320. }
  321.  
  322. const server = sockjs.createServer(options);
  323. const sockets = new Map();
  324. const channels = new Map();
  325. const subchannels = new Map();
  326.  
  327. // Deal with phantom connections.
  328. const sweepSocketInterval = setInterval(() => {
  329. sockets.forEach(socket => {
  330. if (socket.protocol === 'xhr-streaming' &&
  331. socket._session &&
  332. socket._session.recv) {
  333. socket._session.recv.didClose();
  334. }
  335.  
  336. // A ghost connection's `_session.to_tref._idlePrev` (and `_idleNext`) property is `null` while
  337. // it is an object for normal users. Under normal circumstances, those properties should only be
  338. // `null` when the timeout has already been called, but somehow it's not happening for some connections.
  339. // Simply calling `_session.timeout_cb` (the function bound to the aformentioned timeout) manually
  340. // on those connections kills those connections. For a bit of background, this timeout is the timeout
  341. // that sockjs sets to wait for users to reconnect within that time to continue their session.
  342. if (socket._session &&
  343. socket._session.to_tref &&
  344. !socket._session.to_tref._idlePrev) {
  345. socket._session.timeout_cb();
  346. }
  347. });
  348. }, 1000 * 60 * 10);
  349.  
  350. process.on('message', data => {
  351. // console.log('worker received: ' + data);
  352. let socket = null;
  353. let socketid = '';
  354. let channel = null;
  355. let channelid = '';
  356. let subchannel = null;
  357. let subchannelid = '';
  358. let nlLoc = -1;
  359. let message = '';
  360.  
  361. switch (data.charAt(0)) {
  362. case '$': // $code
  363. eval(data.substr(1));
  364. break;
  365.  
  366. case '!': // !socketid
  367. // destroy
  368. socketid = data.substr(1);
  369. socket = sockets.get(socketid);
  370. if (!socket) return;
  371. socket.destroy();
  372. sockets.delete(socketid);
  373. channels.forEach(channel => channel.delete(socketid));
  374. break;
  375.  
  376. case '>':
  377. // >socketid, message
  378. // message
  379. nlLoc = data.indexOf('\n');
  380. socketid = data.substr(1, nlLoc - 1);
  381. socket = sockets.get(socketid);
  382. if (!socket) return;
  383. message = data.substr(nlLoc + 1);
  384. socket.write(message);
  385. break;
  386.  
  387. case '#':
  388. // #channelid, message
  389. // message to channel
  390. nlLoc = data.indexOf('\n');
  391. channelid = data.substr(1, nlLoc - 1);
  392. channel = channels.get(channelid);
  393. if (!channel) return;
  394. message = data.substr(nlLoc + 1);
  395. channel.forEach(socket => socket.write(message));
  396. break;
  397.  
  398. case '+':
  399. // +channelid, socketid
  400. // add to channel
  401. nlLoc = data.indexOf('\n');
  402. socketid = data.substr(nlLoc + 1);
  403. socket = sockets.get(socketid);
  404. if (!socket) return;
  405. channelid = data.substr(1, nlLoc - 1);
  406. channel = channels.get(channelid);
  407. if (!channel) {
  408. channel = new Map();
  409. channels.set(channelid, channel);
  410. }
  411. channel.set(socketid, socket);
  412. break;
  413.  
  414. case '-':
  415. // -channelid, socketid
  416. // remove from channel
  417. nlLoc = data.indexOf('\n');
  418. channelid = data.slice(1, nlLoc);
  419. channel = channels.get(channelid);
  420. if (!channel) return;
  421. socketid = data.slice(nlLoc + 1);
  422. channel.delete(socketid);
  423. subchannel = subchannels.get(channelid);
  424. if (subchannel) subchannel.delete(socketid);
  425. if (!channel.size) {
  426. channels.delete(channelid);
  427. if (subchannel) subchannels.delete(channelid);
  428. }
  429. break;
  430.  
  431. case '.':
  432. // .channelid, subchannelid, socketid
  433. // move subchannel
  434. nlLoc = data.indexOf('\n');
  435. channelid = data.slice(1, nlLoc);
  436. let nlLoc2 = data.indexOf('\n', nlLoc + 1);
  437. subchannelid = data.slice(nlLoc + 1, nlLoc2);
  438. socketid = data.slice(nlLoc2 + 1);
  439.  
  440. subchannel = subchannels.get(channelid);
  441. if (!subchannel) {
  442. subchannel = new Map();
  443. subchannels.set(channelid, subchannel);
  444. }
  445. if (subchannelid === '0') {
  446. subchannel.delete(socketid);
  447. } else {
  448. subchannel.set(socketid, subchannelid);
  449. }
  450. break;
  451.  
  452. case ':':
  453. // :channelid, message
  454. // message to subchannel
  455. nlLoc = data.indexOf('\n');
  456. channelid = data.slice(1, nlLoc);
  457. channel = channels.get(channelid);
  458. if (!channel) return;
  459.  
  460. let messages = [null, null, null];
  461. message = data.substr(nlLoc + 1);
  462. subchannel = subchannels.get(channelid);
  463. channel.forEach((socket, socketid) => {
  464. switch (subchannel ? subchannel.get(socketid) : '0') {
  465. case '1':
  466. if (!messages[1]) {
  467. messages[1] = message.replace(/\n\|split\n[^\n]*\n([^\n]*)\n[^\n]*\n[^\n]*/g, '\n$1');
  468. }
  469. socket.write(messages[1]);
  470. break;
  471. case '2':
  472. if (!messages[2]) {
  473. messages[2] = message.replace(/\n\|split\n[^\n]*\n[^\n]*\n([^\n]*)\n[^\n]*/g, '\n$1');
  474. }
  475. socket.write(messages[2]);
  476. break;
  477. default:
  478. if (!messages[0]) {
  479. messages[0] = message.replace(/\n\|split\n([^\n]*)\n[^\n]*\n[^\n]*\n[^\n]*/g, '\n$1');
  480. }
  481. socket.write(messages[0]);
  482. break;
  483. }
  484. });
  485. break;
  486. }
  487. });
  488.  
  489. // Clean up any remaining connections on disconnect. If this isn't done,
  490. // the process will not exit until any remaining connections have been destroyed.
  491. // Afterwards, the worker process will die on its own.
  492. process.once('disconnect', () => {
  493. clearInterval(sweepSocketInterval);
  494.  
  495. sockets.forEach(socket => {
  496. try {
  497. socket.destroy();
  498. } catch (e) {}
  499. });
  500. sockets.clear();
  501. channels.clear();
  502. subchannels.clear();
  503.  
  504. app.close();
  505. if (appssl) appssl.close();
  506.  
  507. // Let the server(s) finish closing.
  508. setImmediate(() => process.exit(0));
  509. });
  510.  
  511. // this is global so it can be hotpatched if necessary
  512. let isTrustedProxyIp = Dnsbl.checker(Config.proxyip);
  513. let socketCounter = 0;
  514. server.on('connection', socket => {
  515. // For reasons that are not entirely clear, SockJS sometimes triggers
  516. // this event with a null `socket` argument.
  517. if (!socket) return;
  518.  
  519. if (!socket.remoteAddress) {
  520. // SockJS sometimes fails to be able to cache the IP, port, and
  521. // address from connection request headers.
  522. try {
  523. socket.destroy();
  524. } catch (e) {}
  525. return;
  526. }
  527.  
  528. let socketid = '' + (++socketCounter);
  529. sockets.set(socketid, socket);
  530.  
  531. // Change here -- Dragotic
  532. let socketip = socket.remoteAddress;
  533. if (socket.remoteAddress === '127.0.0.1' && socket.headers["x-forwarded-for"]) socketip = socket.headers["x-forwarded-for"].split(",")[0];
  534. if (isTrustedProxyIp(socketip)) {
  535. let ips = (socket.headers['x-forwarded-for'] || '')
  536. .split(',')
  537. .reverse();
  538. for (let ip of ips) {
  539. let proxy = ip.trim();
  540. if (!isTrustedProxyIp(proxy)) {
  541. socketip = proxy;
  542. break;
  543. }
  544. }
  545. }
  546.  
  547. process.send(`*${socketid}\n${socketip}\n${socket.protocol}`);
  548.  
  549. socket.on('data', message => {
  550. // drop empty messages (DDoS?)
  551. if (!message) return;
  552. // drop messages over 100KB
  553. if (message.length > (100 * 1024)) {
  554. console.log(`Dropping client message ${message.length / 1024} KB...`);
  555. console.log(message.slice(0, 160));
  556. return;
  557. }
  558. // drop legacy JSON messages
  559. if (typeof message !== 'string' || message.startsWith('{')) return;
  560. // drop blank messages (DDoS?)
  561. let pipeIndex = message.indexOf('|');
  562. if (pipeIndex < 0 || pipeIndex === message.length - 1) return;
  563.  
  564. process.send(`<${socketid}\n${message}`);
  565. });
  566.  
  567. socket.once('close', () => {
  568. process.send(`!${socketid}`);
  569. sockets.delete(socketid);
  570. channels.forEach(channel => channel.delete(socketid));
  571. });
  572. });
  573. server.installHandlers(app, {});
  574. app.listen(Config.port, Config.bindaddress);
  575. console.log(`Worker ${cluster.worker.id} now listening on ${Config.bindaddress}:${Config.port}`);
  576.  
  577. if (appssl) {
  578. server.installHandlers(appssl, {});
  579. appssl.listen(Config.ssl.port, Config.bindaddress);
  580. console.log(`Worker ${cluster.worker.id} now listening for SSL on port ${Config.ssl.port}`);
  581. }
  582.  
  583. console.log(`Test your server at http://${Config.bindaddress === '0.0.0.0' ? 'localhost' : Config.bindaddress}:${Config.port}`);
  584.  
  585. require('./lib/repl').start(`sockets-${cluster.worker.id}-${process.pid}`, cmd => eval(cmd));
  586. }
Advertisement
Add Comment
Please, Sign In to add comment