Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import * as cluster from 'cluster';
- import * as os from 'os';
- import * as _ from 'lodash';
- import * as logger from './services/bunyan-logger';
- import * as cookie from 'cookie';
- import * as express from 'express';
- import { argv } from 'optimist';
- import * as bodyParser from 'body-parser';
- import * as auth from 'basic-auth';
- import * as bcrypt from 'bcryptjs';
- import {Request} from 'express-serve-static-core';
- import {SIGKILL} from "constants";
- import {SIGCONT} from "constants";
- import * as http from 'http';
- import * as httpProxy from 'http-proxy';
- import {conf} from "./conf/conf";
- // import {commonLib} from "./commonLib";
- import {Numbers} from "./numbers";
- import {HTTPClient} from "./services/http-client";
- import {info} from "./services/info";
- import {ObjectCache} from "./services/objectCache/objectCache";
- import {trackers} from "./singleton/trackers";
- const log = logger.child({ from: 'provisioning-app' });
- process.on('unhandledRejection', function(reason) {
- log.error(reason);
- });
- type Worker = {
- worker: any;
- ctime: number;
- clientsMap: any;
- port: number;
- pid: number;
- idx: number;
- };
- // type Client = {
- // ctime: number;
- // };
- const CLIENT_IS_STALE_TIMEOUT_MINUTES = 2; // alive client keeps cometd longpoll connection, so 2 minutes should be sufficient
- const CHECK_STALE_CLIENTS_INTERVAL_SECONDS = 60;
- const MANAGEMENT_INC_PORT = 1000;
- const WORKER_DEAD_TIMEOUT_MSEC = 10000;
- const CHECKWORKERS_RUN_INTERVAL_SECONDS = 30;
- const clusterPort = argv.nodePort || conf.serverPort;
- let managementPort: number = conf.managementPort - 0;
- const firstAvailablePort = 8040;
- const availablePorts = [
- 8040, 8041, 8042, 8043, 8044, 8045, 8046, 8047,
- 8048, 8049, 8050, 8051, 8052, 8053, 8054, 8055
- ];
- let numCPUs = os.cpus().length;
- if(numCPUs > availablePorts.length) {
- numCPUs = availablePorts.length;
- }
- const workersMap = {};
- function dropStaleClients(): void {
- let clients: number = 0;
- let staleClients: number = 0;
- for(const workerId in workersMap) {
- let atime: number = 0;
- const worker = workersMap[workerId];
- const clientsMap = worker.clientsMap;
- for(const sid in clientsMap) {
- const client = clientsMap[sid];
- if(client.atime > atime) {
- atime = client.atime;
- }
- const now = new Date().getTime();
- if((client.atime + (CLIENT_IS_STALE_TIMEOUT_MINUTES * Numbers.SECONDS_IN_MINUTE * Numbers.MILLIS_IN_SECOND)) < now) {
- log.warn(`Dropping stale client ${sid}`);
- delete clientsMap[sid];
- staleClients ++;
- } else {
- clients ++;
- }
- }
- const ago = atime ? `${Math.floor((new Date().getTime() - atime) / Numbers.MILLIS_IN_SECOND)} seconds ago` : 'n/a';
- const clientsCount = _.keys(worker.clientsMap).length;
- log.warn(`Workers stats: id ${workerId}, clients ${clientsCount}, last activity ${ago}`);
- }
- log.warn(`dropStaleClients: clients ${clients}, stale clients purged ${staleClients}`);
- }
- function getMostFreeWorker(): Worker {
- let freePid = _.keys(workersMap)[0];
- let minClients = _.keys(workersMap[freePid].clientsMap).length;
- for(const pid in workersMap) {
- const worker = workersMap[pid];
- const clientsCount = _.keys(worker.clientsMap).length;
- if(clientsCount < minClients) {
- freePid = pid;
- minClients = clientsCount;
- }
- }
- return workersMap[freePid];
- }
- function getClientWorker(opt: { sid: string }): Worker {
- const sid = opt.sid;
- if(!sid) {
- // should not happen
- const _err = `getClientWorker: no sid`;
- log.error(_err);
- throw new Error(_err);
- }
- let workerPid: string;
- for(const pid in workersMap) {
- if(workersMap[pid].clientsMap[sid]) {
- workerPid = pid;
- break;
- }
- }
- const worker: Worker = workersMap[workerPid];
- return worker;
- }
- function getFreePort(): number {
- let ports = _.clone(availablePorts);
- for(const pid in workersMap) {
- const worker: Worker = workersMap[pid];
- const idx = ports.indexOf(worker.port);
- if(idx >= 0) {
- ports.splice(idx, 1);
- }
- }
- return ports[0] || 0;
- }
- function getOrAllocateWorker(opt: { sid: string }): Worker {
- const sid: string = opt.sid;
- let worker: Worker = getClientWorker({ sid });
- if(!worker) {
- worker = getMostFreeWorker();
- if(!worker) {
- // should not happen
- throw new Error('getOrAllocateWorker: no free worker');
- }
- const ctime = new Date().getTime();
- const atime = ctime;
- worker.clientsMap[sid] = {
- ctime,
- atime
- };
- log.warn({
- idx: worker.idx,
- workerId: worker.pid,
- sid
- }, `Allocated worker pid ${worker.pid} idx ${worker.idx} for sid ${sid}`);
- }
- return worker;
- }
- async function waitWorkerMsg(opt: { worker: any, type: string }): Promise<any> {
- const worker = opt.worker;
- const type = opt.type;
- return new Promise((resolve) => {
- const handler = (msg) => {
- if(msg.type === type) {
- worker.removeListener('message', handler);
- resolve(msg);
- }
- };
- worker.on('message', handler);
- });
- }
- async function createWorker(): Promise<void> {
- const port: number = getFreePort();
- if(!port) {
- // should not happen
- const _err = 'createWorker: no free port';
- log.error(_err);
- throw new Error(_err);
- }
- let worker = cluster.fork();
- const pid = worker.process.pid;
- log.info(`Worker created, pid ${pid}, port ${port}`);
- worker.send({
- type: 'startWorker',
- data: {
- port
- }
- });
- await waitWorkerMsg({
- worker,
- type: 'workerReady'
- });
- const idx = port - firstAvailablePort;
- workersMap[pid] = {
- worker,
- clientsMap: {},
- port,
- pid,
- idx
- };
- }
- function updateClientAccessTime(opt: { sid: string }): void {
- const sid: string = opt.sid;
- let updated: number = 0;
- for(const workerId in workersMap) {
- const worker = workersMap[workerId];
- if(worker.clientsMap[sid]) {
- updated ++;
- worker.clientsMap[sid].atime = new Date().getTime();
- }
- }
- if(!updated) {
- log.error(`updateClientAccessTime: no worker for sid ${sid}`);
- }
- if(updated > 1) {
- log.error(`updateClientAccessTime: client on several workers!!!, sid ${sid}`);
- }
- }
- const prometheusPrefix: string = 'app-provisioning';
- async function getPrometheus() {
- const prefix: string = prometheusPrefix;
- const lines: string[] = [];
- for(const pid in workersMap) {
- const worker = workersMap[pid];
- const workerIdx = worker.idx;
- const port = worker.port + MANAGEMENT_INC_PORT;
- const httpClient = new HTTPClient({
- host: '127.0.0.1',
- port: port
- });
- let res: any = await httpClient.get({
- path: '/prometheus'
- });
- res = res.body;
- const metricTypes = res.metricTypes;
- const data = res.data;
- const sections = metricTypes.sections;
- for(const section of sections) {
- const sectionName = `worker${workerIdx}_${section.name}`;
- const descr = section.descr;
- const type = section.type;
- const key = `${prefix}_${sectionName}`;
- lines.push(`# HELP ${key} ${descr}`);
- lines.push(`# TYPE ${key} ${type}`);
- for(const metric of section.metrics) {
- const metricName = metric.name;
- let value = data[metricName];
- if(typeof value == 'object') {
- value = JSON.stringify(value);
- }
- lines.push(`${key}{serviceName="${prometheusPrefix}",metricName="${metricName}"} ${value}`);
- }
- }
- }
- return {
- output: lines
- }
- }
- function userIsAdmin (_request: Request): Promise<boolean> {
- return new Promise((resolve) => {
- let credentials = auth(_request);
- if (!credentials) {
- resolve(false);
- return;
- }
- if(credentials.name !== conf.adminUsername) {
- resolve(false);
- return;
- }
- bcrypt.compare(credentials.pass, conf.adminPassword, (err, result) => {
- if (err || !result) {
- resolve(false);
- } else {
- resolve(true);
- }
- });
- });
- }
- async function getHealth(opt: { authenticated: boolean }) {
- let status = 'UP';
- if(_.keys(workersMap).length === 0) {
- status = 'DOWN';
- }
- return {
- status
- }
- }
- async function checkWorkers(): Promise<void> {
- log.warn('checkWorkers: start');
- for(const pid in workersMap) {
- const worker = workersMap[pid];
- const httpClient = new HTTPClient({
- host: '127.0.0.1',
- port: worker.port + MANAGEMENT_INC_PORT
- });
- const resp = await httpClient.get({
- path: '/health',
- timeout: WORKER_DEAD_TIMEOUT_MSEC
- });
- if(resp.error) {
- // worker considered dead
- log.error(`worker pid ${pid} returned error: '${resp.error}', killing`);
- worker.worker.kill(SIGCONT);
- worker.worker.kill(SIGKILL);
- }
- }
- setTimeout(() => {
- checkWorkers();
- }, CHECKWORKERS_RUN_INTERVAL_SECONDS * Numbers.MILLIS_IN_SECOND);
- }
- function createManagementService() {
- const servicesApp = express();
- const servicesServer = http.createServer(servicesApp);
- servicesServer.listen(managementPort, function () {
- servicesApp.use(bodyParser.json({limit: '50mb'}));
- servicesApp.route('/prometheus').get(async (req, res) => {
- const data = await getPrometheus();
- res.setHeader('content-type', 'text/plain');
- res.send(data.output.join('\n'));
- });
- servicesApp.route('/info').get((req, res) => {
- let resp = info.getInfo();
- res.json(resp);
- });
- //
- servicesApp.route('/health').get(async (req, res) => {
- const isAdmin = await userIsAdmin(req);
- const resp = await getHealth({
- authenticated: isAdmin
- });
- res.json(resp);
- });
- // servicesApp.route('/metrics').get(function (req, response) {
- // let _htcc = copyHTCC(req, htcc);
- // let query = req.query || {};
- //
- // Metrics.getStats({
- // htcc: _htcc,
- // ext: query.ext,
- // urls: query.urls,
- // users: query.users
- // }, function (err, resp) {
- // if (err) {
- // return response.status(HttpStatus.INTERNAL_SERVER_ERROR).send(commonLib.statusV2ToV3(err));
- // }
- // response.setHeader('content-type', 'text/plain');
- // response.end(JSON.stringify(resp, null, Numbers.JSON_STRINGIFY_INDENT));
- // });
- // });
- // servicesApp.post('/loggers/ROOT', function(req, response) {
- // checkAdminCredentials(req, function(err) {
- // if(err) {
- // return response.status(HttpStatus.FORBIDDEN).send({
- // status: {
- // code: 403,
- // message: 'Access denied'
- // }
- // });
- // }
- //
- // let body = req.body;
- //
- // let configuredLevel = body.configuredLevel || 'trace';
- // configuredLevel = configuredLevel.toLowerCase();
- //
- // runtimeMgmt.changeLogLevel({
- // configuredLevel
- // }).then(function(resp) {
- // response.json({
- // status: {
- // code: 0,
- // data: resp.data
- // }
- // });
- // }, function(_err) {
- // response.status(HttpStatus.INTERNAL_SERVER_ERROR).send({
- // status: {
- // code: Numbers.gwsStatusCodes.INTERNAL_ERROR,
- // message: _err.toString()
- // }
- // });
- // });
- // });
- });
- log.info(`Secondary services on http://localhost:${managementPort}/`);
- }
- function proxyResHandler(proxyRes, req, res) {
- const setCookie = _.get(proxyRes, [ 'headers', 'set-cookie' ]);
- if(setCookie) {
- let sid: string = null;
- for(let i = 0; i < setCookie.length; i ++) {
- const cookie = setCookie[i];
- const m = cookie.match(new RegExp(`^${conf.cookieName}=([^; ]+)`));
- if(m) {
- sid = m[1];
- break;
- }
- }
- if(sid) {
- try {
- const worker: Worker = getOrAllocateWorker({
- sid
- });
- console.log('path', req.url);
- log.warn(`Allocated worker ${worker.pid} idx ${worker.idx} for sid ${sid} after set-cookie`);
- } catch(e) {
- log.error({
- message: e.message,
- stack: e.stack
- }, `Failed to get or allocate worker: ${e.message}`);
- return;
- }
- }
- }
- }
- async function startMaster() {
- log.info(`Provisioning Cluster Master Process ${process.pid} is running, using ${numCPUs} CPUs`);
- await trackers.run({
- n: 'Remove old PostgreSQL DBs',
- o: ObjectCache,
- f: 'removeObjectCachePostgresDBs'
- });
- const proxy = httpProxy.createProxyServer({});
- proxy.on('error', function(e) {
- log.error({
- message: e.message,
- stack: e.stack
- }, `failed to proxy req: ${e.message}`);
- });
- proxy.on('proxyRes', proxyResHandler);
- for (let i = 0; i < numCPUs; i ++) {
- await createWorker();
- }
- const server = http.createServer(async (req, res) => {
- const cookies: any = cookie.parse(_.get(req, 'headers.cookie') || '');
- let sid = cookies[conf.cookieName];
- if(!sid) {
- sid = 'UNAUTHORIZED';
- }
- let worker;
- try {
- worker = getOrAllocateWorker({
- sid
- });
- } catch(e) {
- log.error({
- message: e.message,
- stack: e.stack
- }, `Failed to get or allocate worker: ${e.message}`);
- return;
- }
- const workerPort = worker.port;
- updateClientAccessTime({ sid });
- proxy.web(req, res, {target: `http://127.0.0.1:${workerPort}`});
- });
- server.listen(clusterPort);
- cluster.on('exit', (worker, code, signal) => {
- log.error(`Worker ${worker.process.pid} died, exit code ${code}`);
- delete workersMap[worker.process.pid];
- createWorker();
- });
- setInterval(dropStaleClients, CHECK_STALE_CLIENTS_INTERVAL_SECONDS * Numbers.MILLIS_IN_SECOND);
- if(managementPort) {
- createManagementService();
- }
- checkWorkers();
- log.warn(`Provisioning Cluster ready at port ${clusterPort}`);
- }
- // MAIN LOOP
- if (cluster.isMaster) {
- startMaster();
- } else {
- // worker side
- process.on('message', (msg) => {
- if(msg.type === 'startWorker') {
- const port = msg.data.port;
- process.env.PROVISIONING_INTERNAL_PORT = port;
- require('./agentsetup');
- process.send({
- type: 'workerReady',
- data: {}
- });
- }
- });
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement