Advertisement
Guest User

Untitled

a guest
May 9th, 2018
219
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import * as cluster from 'cluster';
  2. import * as os from 'os';
  3. import * as _ from 'lodash';
  4. import * as logger from './services/bunyan-logger';
  5. import * as cookie from 'cookie';
  6. import * as express from 'express';
  7. import { argv } from 'optimist';
  8. import * as bodyParser from 'body-parser';
  9. import * as auth from 'basic-auth';
  10. import * as bcrypt from 'bcryptjs';
  11. import {Request} from 'express-serve-static-core';
  12. import {SIGKILL} from "constants";
  13. import {SIGCONT} from "constants";
  14. import * as http from 'http';
  15. import * as httpProxy from 'http-proxy';
  16.  
  17. import {conf} from "./conf/conf";
  18. // import {commonLib} from "./commonLib";
  19. import {Numbers} from "./numbers";
  20. import {HTTPClient} from "./services/http-client";
  21. import {info} from "./services/info";
  22. import {ObjectCache} from "./services/objectCache/objectCache";
  23. import {trackers} from "./singleton/trackers";
  24.  
  25. const log = logger.child({ from: 'provisioning-app' });
  26.  
  27. process.on('unhandledRejection', function(reason) {
  28.     log.error(reason);
  29. });
  30.  
  31. type Worker = {
  32.     worker: any;
  33.     ctime: number;
  34.     clientsMap: any;
  35.     port: number;
  36.     pid: number;
  37.     idx: number;
  38. };
  39.  
  40. // type Client = {
  41. //     ctime: number;
  42. // };
  43.  
  44. const CLIENT_IS_STALE_TIMEOUT_MINUTES = 2; // alive client keeps cometd longpoll connection, so 2 minutes should be sufficient
  45. const CHECK_STALE_CLIENTS_INTERVAL_SECONDS = 60;
  46. const MANAGEMENT_INC_PORT = 1000;
  47. const WORKER_DEAD_TIMEOUT_MSEC = 10000;
  48. const CHECKWORKERS_RUN_INTERVAL_SECONDS = 30;
  49.  
  50. const clusterPort = argv.nodePort ||  conf.serverPort;
  51. let managementPort: number = conf.managementPort - 0;
  52.  
  53. const firstAvailablePort = 8040;
  54. const availablePorts = [
  55.     8040, 8041, 8042, 8043, 8044, 8045, 8046, 8047,
  56.     8048, 8049, 8050, 8051, 8052, 8053, 8054, 8055
  57. ];
  58.  
  59. let numCPUs = os.cpus().length;
  60. if(numCPUs > availablePorts.length) {
  61.     numCPUs = availablePorts.length;
  62. }
  63.  
  64. const workersMap = {};
  65.  
  66. function dropStaleClients(): void {
  67.     let clients: number = 0;
  68.     let staleClients: number = 0;
  69.  
  70.     for(const workerId in workersMap) {
  71.         let atime: number = 0;
  72.         const worker = workersMap[workerId];
  73.         const clientsMap = worker.clientsMap;
  74.         for(const sid in clientsMap) {
  75.             const client = clientsMap[sid];
  76.             if(client.atime > atime) {
  77.                 atime = client.atime;
  78.             }
  79.             const now = new Date().getTime();
  80.             if((client.atime + (CLIENT_IS_STALE_TIMEOUT_MINUTES * Numbers.SECONDS_IN_MINUTE * Numbers.MILLIS_IN_SECOND)) < now) {
  81.                 log.warn(`Dropping stale client ${sid}`);
  82.                 delete clientsMap[sid];
  83.                 staleClients ++;
  84.             } else {
  85.                 clients ++;
  86.             }
  87.         }
  88.  
  89.         const ago = atime ? `${Math.floor((new Date().getTime() - atime) / Numbers.MILLIS_IN_SECOND)} seconds ago` : 'n/a';
  90.         const clientsCount = _.keys(worker.clientsMap).length;
  91.         log.warn(`Workers stats: id ${workerId}, clients ${clientsCount}, last activity ${ago}`);
  92.     }
  93.  
  94.     log.warn(`dropStaleClients: clients ${clients}, stale clients purged ${staleClients}`);
  95. }
  96.  
  97. function getMostFreeWorker(): Worker {
  98.     let freePid = _.keys(workersMap)[0];
  99.     let minClients = _.keys(workersMap[freePid].clientsMap).length;
  100.  
  101.     for(const pid in workersMap) {
  102.         const worker = workersMap[pid];
  103.         const clientsCount = _.keys(worker.clientsMap).length;
  104.         if(clientsCount < minClients) {
  105.             freePid = pid;
  106.             minClients = clientsCount;
  107.         }
  108.     }
  109.  
  110.     return workersMap[freePid];
  111. }
  112.  
  113. function getClientWorker(opt: { sid: string }): Worker {
  114.     const sid = opt.sid;
  115.  
  116.     if(!sid) {
  117.         // should not happen
  118.         const _err = `getClientWorker: no sid`;
  119.         log.error(_err);
  120.         throw new Error(_err);
  121.     }
  122.  
  123.     let workerPid: string;
  124.     for(const pid in workersMap) {
  125.         if(workersMap[pid].clientsMap[sid]) {
  126.             workerPid = pid;
  127.             break;
  128.         }
  129.     }
  130.  
  131.     const worker: Worker = workersMap[workerPid];
  132.  
  133.     return worker;
  134. }
  135.  
  136. function getFreePort(): number {
  137.     let ports = _.clone(availablePorts);
  138.  
  139.     for(const pid in workersMap) {
  140.         const worker: Worker = workersMap[pid];
  141.         const idx = ports.indexOf(worker.port);
  142.         if(idx >= 0) {
  143.             ports.splice(idx, 1);
  144.         }
  145.     }
  146.  
  147.     return ports[0] || 0;
  148. }
  149.  
  150. function getOrAllocateWorker(opt: { sid: string }): Worker {
  151.     const sid: string = opt.sid;
  152.     let worker: Worker = getClientWorker({ sid });
  153.     if(!worker) {
  154.         worker = getMostFreeWorker();
  155.         if(!worker) {
  156.             // should not happen
  157.             throw new Error('getOrAllocateWorker: no free worker');
  158.         }
  159.  
  160.         const ctime = new Date().getTime();
  161.         const atime = ctime;
  162.         worker.clientsMap[sid] = {
  163.             ctime,
  164.             atime
  165.         };
  166.         log.warn({
  167.             idx: worker.idx,
  168.             workerId: worker.pid,
  169.             sid
  170.         }, `Allocated worker pid ${worker.pid} idx ${worker.idx} for sid ${sid}`);
  171.     }
  172.  
  173.     return worker;
  174. }
  175.  
  176. async function waitWorkerMsg(opt: { worker: any, type: string }): Promise<any> {
  177.     const worker = opt.worker;
  178.     const type = opt.type;
  179.  
  180.     return new Promise((resolve) => {
  181.         const handler = (msg) => {
  182.             if(msg.type === type) {
  183.                 worker.removeListener('message', handler);
  184.                 resolve(msg);
  185.             }
  186.         };
  187.  
  188.         worker.on('message', handler);
  189.     });
  190. }
  191.  
  192. async function createWorker(): Promise<void> {
  193.     const port: number = getFreePort();
  194.     if(!port) {
  195.         // should not happen
  196.         const _err = 'createWorker: no free port';
  197.         log.error(_err);
  198.         throw new Error(_err);
  199.     }
  200.  
  201.     let worker = cluster.fork();
  202.     const pid = worker.process.pid;
  203.  
  204.     log.info(`Worker created, pid ${pid}, port ${port}`);
  205.  
  206.     worker.send({
  207.         type: 'startWorker',
  208.         data: {
  209.             port
  210.         }
  211.     });
  212.     await waitWorkerMsg({
  213.         worker,
  214.         type: 'workerReady'
  215.     });
  216.  
  217.     const idx = port - firstAvailablePort;
  218.     workersMap[pid] = {
  219.         worker,
  220.         clientsMap: {},
  221.         port,
  222.         pid,
  223.         idx
  224.     };
  225. }
  226.  
  227. function updateClientAccessTime(opt: { sid: string }): void {
  228.     const sid: string = opt.sid;
  229.  
  230.     let updated: number = 0;
  231.     for(const workerId in workersMap) {
  232.         const worker = workersMap[workerId];
  233.         if(worker.clientsMap[sid]) {
  234.             updated ++;
  235.             worker.clientsMap[sid].atime = new Date().getTime();
  236.         }
  237.     }
  238.  
  239.     if(!updated) {
  240.         log.error(`updateClientAccessTime: no worker for sid ${sid}`);
  241.     }
  242.  
  243.     if(updated > 1) {
  244.         log.error(`updateClientAccessTime: client on several workers!!!, sid ${sid}`);
  245.     }
  246. }
  247.  
  248. const prometheusPrefix: string = 'app-provisioning';
  249. async function getPrometheus() {
  250.     const prefix: string = prometheusPrefix;
  251.  
  252.     const lines: string[] = [];
  253.  
  254.     for(const pid in workersMap) {
  255.         const worker = workersMap[pid];
  256.         const workerIdx = worker.idx;
  257.         const port = worker.port + MANAGEMENT_INC_PORT;
  258.  
  259.         const httpClient = new HTTPClient({
  260.             host: '127.0.0.1',
  261.             port: port
  262.         });
  263.  
  264.         let res: any = await httpClient.get({
  265.             path: '/prometheus'
  266.         });
  267.         res = res.body;
  268.  
  269.         const metricTypes = res.metricTypes;
  270.         const data = res.data;
  271.         const sections = metricTypes.sections;
  272.         for(const section of sections) {
  273.             const sectionName = `worker${workerIdx}_${section.name}`;
  274.             const descr = section.descr;
  275.             const type = section.type;
  276.             const key = `${prefix}_${sectionName}`;
  277.  
  278.             lines.push(`# HELP ${key} ${descr}`);
  279.             lines.push(`# TYPE ${key} ${type}`);
  280.             for(const metric of section.metrics) {
  281.                 const metricName = metric.name;
  282.                 let value = data[metricName];
  283.                 if(typeof value == 'object') {
  284.                     value = JSON.stringify(value);
  285.                 }
  286.  
  287.                 lines.push(`${key}{serviceName="${prometheusPrefix}",metricName="${metricName}"} ${value}`);
  288.             }
  289.         }
  290.     }
  291.  
  292.     return {
  293.         output: lines
  294.     }
  295. }
  296.  
  297. function userIsAdmin (_request: Request): Promise<boolean> {
  298.     return new Promise((resolve) => {
  299.         let credentials = auth(_request);
  300.         if (!credentials) {
  301.             resolve(false);
  302.             return;
  303.         }
  304.         if(credentials.name !== conf.adminUsername) {
  305.             resolve(false);
  306.             return;
  307.         }
  308.         bcrypt.compare(credentials.pass, conf.adminPassword, (err, result) => {
  309.             if (err || !result) {
  310.                 resolve(false);
  311.             } else {
  312.                 resolve(true);
  313.             }
  314.         });
  315.     });
  316. }
  317.  
  318. async function getHealth(opt: { authenticated: boolean }) {
  319.     let status = 'UP';
  320.     if(_.keys(workersMap).length === 0) {
  321.         status = 'DOWN';
  322.     }
  323.  
  324.     return {
  325.         status
  326.     }
  327. }
  328.  
  329. async function checkWorkers(): Promise<void> {
  330.     log.warn('checkWorkers: start');
  331.  
  332.     for(const pid in workersMap) {
  333.         const worker = workersMap[pid];
  334.  
  335.         const httpClient = new HTTPClient({
  336.             host: '127.0.0.1',
  337.             port: worker.port + MANAGEMENT_INC_PORT
  338.         });
  339.  
  340.         const resp = await httpClient.get({
  341.             path: '/health',
  342.             timeout: WORKER_DEAD_TIMEOUT_MSEC
  343.         });
  344.         if(resp.error) {
  345.             // worker considered dead
  346.             log.error(`worker pid ${pid} returned error: '${resp.error}', killing`);
  347.             worker.worker.kill(SIGCONT);
  348.             worker.worker.kill(SIGKILL);
  349.         }
  350.     }
  351.  
  352.     setTimeout(() => {
  353.         checkWorkers();
  354.     }, CHECKWORKERS_RUN_INTERVAL_SECONDS * Numbers.MILLIS_IN_SECOND);
  355. }
  356.  
  357. function createManagementService() {
  358.     const servicesApp = express();
  359.     const servicesServer = http.createServer(servicesApp);
  360.     servicesServer.listen(managementPort, function () {
  361.  
  362.         servicesApp.use(bodyParser.json({limit: '50mb'}));
  363.  
  364.         servicesApp.route('/prometheus').get(async (req, res) => {
  365.             const data = await getPrometheus();
  366.  
  367.             res.setHeader('content-type', 'text/plain');
  368.             res.send(data.output.join('\n'));
  369.         });
  370.  
  371.         servicesApp.route('/info').get((req, res) => {
  372.             let resp = info.getInfo();
  373.             res.json(resp);
  374.         });
  375.         //
  376.         servicesApp.route('/health').get(async (req, res) => {
  377.             const isAdmin = await userIsAdmin(req);
  378.             const resp = await getHealth({
  379.                 authenticated: isAdmin
  380.             });
  381.  
  382.             res.json(resp);
  383.         });
  384.  
  385.         // servicesApp.route('/metrics').get(function (req, response) {
  386.         //     let _htcc = copyHTCC(req, htcc);
  387.         //     let query = req.query || {};
  388.         //
  389.         //     Metrics.getStats({
  390.         //         htcc: _htcc,
  391.         //         ext: query.ext,
  392.         //         urls: query.urls,
  393.         //         users: query.users
  394.         //     }, function (err, resp) {
  395.         //         if (err) {
  396.         //             return response.status(HttpStatus.INTERNAL_SERVER_ERROR).send(commonLib.statusV2ToV3(err));
  397.         //         }
  398.         //         response.setHeader('content-type', 'text/plain');
  399.         //         response.end(JSON.stringify(resp, null, Numbers.JSON_STRINGIFY_INDENT));
  400.         //     });
  401.         // });
  402.  
  403.         // servicesApp.post('/loggers/ROOT', function(req, response) {
  404.         //     checkAdminCredentials(req, function(err) {
  405.         //         if(err) {
  406.         //             return response.status(HttpStatus.FORBIDDEN).send({
  407.         //                 status: {
  408.         //                     code: 403,
  409.         //                     message: 'Access denied'
  410.         //                 }
  411.         //             });
  412.         //         }
  413.         //
  414.         //         let body = req.body;
  415.         //
  416.         //         let configuredLevel = body.configuredLevel || 'trace';
  417.         //         configuredLevel = configuredLevel.toLowerCase();
  418.         //
  419.         //         runtimeMgmt.changeLogLevel({
  420.         //             configuredLevel
  421.         //         }).then(function(resp) {
  422.         //             response.json({
  423.         //                 status: {
  424.         //                     code: 0,
  425.         //                     data: resp.data
  426.         //                 }
  427.         //             });
  428.         //         }, function(_err) {
  429.         //             response.status(HttpStatus.INTERNAL_SERVER_ERROR).send({
  430.         //                 status: {
  431.         //                     code: Numbers.gwsStatusCodes.INTERNAL_ERROR,
  432.         //                     message: _err.toString()
  433.         //                 }
  434.         //             });
  435.         //         });
  436.         //     });
  437.         });
  438.  
  439.         log.info(`Secondary services on http://localhost:${managementPort}/`);
  440. }
  441.  
  442. function proxyResHandler(proxyRes, req, res) {
  443.     const setCookie = _.get(proxyRes, [ 'headers', 'set-cookie' ]);
  444.     if(setCookie) {
  445.         let sid: string = null;
  446.         for(let i = 0; i < setCookie.length; i ++) {
  447.             const cookie = setCookie[i];
  448.             const m = cookie.match(new RegExp(`^${conf.cookieName}=([^; ]+)`));
  449.             if(m) {
  450.                 sid = m[1];
  451.                 break;
  452.             }
  453.         }
  454.         if(sid) {
  455.             try {
  456.                 const worker: Worker = getOrAllocateWorker({
  457.                     sid
  458.                 });
  459.                 console.log('path', req.url);
  460.                 log.warn(`Allocated worker ${worker.pid} idx ${worker.idx} for sid ${sid} after set-cookie`);
  461.             } catch(e) {
  462.                 log.error({
  463.                     message: e.message,
  464.                     stack: e.stack
  465.                 }, `Failed to get or allocate worker: ${e.message}`);
  466.                 return;
  467.             }
  468.         }
  469.     }
  470. }
  471.  
  472. async function startMaster() {
  473.     log.info(`Provisioning Cluster Master Process ${process.pid} is running, using ${numCPUs} CPUs`);
  474.  
  475.     await trackers.run({
  476.         n: 'Remove old PostgreSQL DBs',
  477.         o: ObjectCache,
  478.         f: 'removeObjectCachePostgresDBs'
  479.     });
  480.  
  481.     const proxy = httpProxy.createProxyServer({});
  482.     proxy.on('error', function(e) {
  483.         log.error({
  484.             message: e.message,
  485.             stack: e.stack
  486.         }, `failed to proxy req: ${e.message}`);
  487.     });
  488.     proxy.on('proxyRes', proxyResHandler);
  489.  
  490.     for (let i = 0; i < numCPUs; i ++) {
  491.         await createWorker();
  492.     }
  493.  
  494.     const server = http.createServer(async (req, res) => {
  495.         const cookies: any = cookie.parse(_.get(req, 'headers.cookie') || '');
  496.         let sid = cookies[conf.cookieName];
  497.         if(!sid) {
  498.             sid = 'UNAUTHORIZED';
  499.         }
  500.  
  501.         let worker;
  502.         try {
  503.             worker = getOrAllocateWorker({
  504.                 sid
  505.             });
  506.         } catch(e) {
  507.             log.error({
  508.                 message: e.message,
  509.                 stack: e.stack
  510.             }, `Failed to get or allocate worker: ${e.message}`);
  511.             return;
  512.         }
  513.  
  514.         const workerPort = worker.port;
  515.  
  516.         updateClientAccessTime({ sid });
  517.  
  518.         proxy.web(req, res, {target: `http://127.0.0.1:${workerPort}`});
  519.     });
  520.     server.listen(clusterPort);
  521.  
  522.     cluster.on('exit', (worker, code, signal) => {
  523.         log.error(`Worker ${worker.process.pid} died, exit code ${code}`);
  524.         delete workersMap[worker.process.pid];
  525.  
  526.         createWorker();
  527.     });
  528.  
  529.     setInterval(dropStaleClients, CHECK_STALE_CLIENTS_INTERVAL_SECONDS * Numbers.MILLIS_IN_SECOND);
  530.  
  531.     if(managementPort) {
  532.         createManagementService();
  533.     }
  534.  
  535.     checkWorkers();
  536.  
  537.     log.warn(`Provisioning Cluster ready at port ${clusterPort}`);
  538. }
  539.  
  540. // MAIN LOOP
  541. if (cluster.isMaster) {
  542.     startMaster();
  543. } else {
  544.     // worker side
  545.     process.on('message', (msg) => {
  546.         if(msg.type === 'startWorker') {
  547.             const port = msg.data.port;
  548.             process.env.PROVISIONING_INTERNAL_PORT = port;
  549.             require('./agentsetup');
  550.             process.send({
  551.                 type: 'workerReady',
  552.                 data: {}
  553.             });
  554.         }
  555.     });
  556. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement