SHARE
TWEET

Untitled

a guest Apr 21st, 2019 82 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. "use strict";
  2. import Queue from "bull";
  3. import Connection from "./DB/mongoDB";
  4. import path from "path";
  5. import url from "url";
  6. import Arena from "bull-arena";
  7. import logger from "./Helpers/logger";
  8.  
  9. //{qName:{Q: Bull:Queue, status:[Paused,Running,Disabled...] }}
  10. const AllQueues = {};
  11.  
  12. export var InitiateJobs = async function(app, redisURL, mongoDBName) {
  13.   await Connection.connectToMongo();
  14.   let db = Connection.client.db(mongoDBName);
  15.   const collection = db.collection("SysJobType");
  16.   let jobs = await collection.find().toArray();
  17.   jobs.forEach(job => {
  18.     if (job.IsEnabled === false) {
  19.       //don't create a queue for the disabled job but catch to show it in UI
  20.       AllQueues[job.name] = {};
  21.       AllQueues[job.name].status = "Disabled";
  22.     } else {
  23.       let queue = createJobQueue(
  24.         job.name,
  25.         redisURL,
  26.         job.queueOptions,
  27.         job.autoGenerate,
  28.         job.JobData,
  29.         job.JobOptions,
  30.         job.parallelJobCount,
  31.         `${path.join(__dirname, `${job.processor}`)}`
  32.       );
  33.     }
  34.   });
  35.   //---
  36.   setupArena(app, redisURL);
  37. };
  38.  
  39. //create a queue for each
  40. /**
  41.  * @param {string} name
  42.  * @param {string} redisURL
  43.  * @param {Queue.QueueOptions} queueOptions
  44.  * @param {boolean} autoGenerate
  45.  * @param {any} jobData
  46.  * @param {Queue.JobOptions} jobOptions
  47.  * @param {number} parallelJobCount
  48.  * @param {string | ((job: Queue.Job<any>, done: Queue.DoneCallback) => void) | ((job: Queue.Job<any>) => Promise<any>)} processor
  49.  */
  50. function createJobQueue(
  51.   name,
  52.   redisURL,
  53.   queueOptions,
  54.   autoGenerate,
  55.   jobData,
  56.   jobOptions,
  57.   parallelJobCount,
  58.   processor
  59. ) {
  60.   try {
  61.     logger.info(
  62.       `adding job ${JSON.stringify({
  63.         name,
  64.         redisURL,
  65.         queueOptions,
  66.         autoGenerate,
  67.         jobData,
  68.         jobOptions,
  69.         parallelJobCount,
  70.         processor
  71.       })}`
  72.     );
  73.     AllQueues[name] = {};
  74.     AllQueues[name].Q = new Queue(name, redisURL, {
  75.       ...queueOptions,
  76.       redis: { showFriendlyErrorStack: true }
  77.     });
  78.     //a job can be auto generated like repeatable jobs
  79.     if (autoGenerate === true) {
  80.       AllQueues[name].Q.add(jobData, jobOptions);
  81.     }
  82.     //---add processor for the job
  83.     AllQueues[name].Q.process(+parallelJobCount, processor);
  84.     //--set the initial status
  85.     AllQueues[name].status = "Running";
  86.     //----
  87.     AllQueues[name].Q.on("failed", (job, err) => {
  88.       logger.error(`job ${JSON.stringify(job)} failed details: %o`, err);
  89.     });
  90.     AllQueues[name].Q.on("error", err => {
  91.       logger.error(`uncaught error in queue %o`, err);
  92.     });
  93.     AllQueues[name].Q.on("paused", () => {
  94.       AllQueues[name].status = "Paused";
  95.       logger.info(`queue ${name} is paused`);
  96.     });
  97.     AllQueues[name].Q.on("resumed", () => {
  98.       AllQueues[name].status = "Running";
  99.       logger.info(`queue ${name} is resumed`);
  100.     });
  101.     return AllQueues[name].Q;
  102.   } catch (error) {
  103.     logger.error("failed to add job %o", error);
  104.     return undefined;
  105.   }
  106. }
  107.  
  108. export var pauseQueue = async function(queueName) {
  109.   try {
  110.     let result = await AllQueues[queueName].Q.pause(false);
  111.     logger.info(result);
  112.     return result;
  113.   } catch (error) {
  114.     logger.error("failed to pause queue %o", error);
  115.     return undefined;
  116.   }
  117. };
  118.  
  119. export var resumeQueue = async function(queueName) {
  120.   try {
  121.     let result = await AllQueues[queueName].Q.resume();
  122.     logger.info(result);
  123.     return result;
  124.   } catch (error) {
  125.     logger.error("failed to resume queue %o", error);
  126.     return undefined;
  127.   }
  128. };
  129.  
  130. export var getJobs = async function(queueName, status, start, end) {
  131.   logger.debug("%o", { queueName, status, start, end });
  132.   start = start || 0;
  133.   end = end || -1;
  134.   //capitalize first char
  135.   status = status.charAt(0).toUpperCase() + status.slice(1);
  136.   let funcName = `get${status}`;
  137.   let jobs = await AllQueues[queueName].Q[funcName](start, end);
  138.   return jobs;
  139. };
  140.  
  141. function setupArena(app, redisURL) {
  142.   const QueuesToMonitor = [];
  143.   for (let qName in AllQueues) {
  144.     if (AllQueues[qName].status && AllQueues[qName].status !== "Disabled") {
  145.       QueuesToMonitor.push({
  146.         name: qName,
  147.         hostId: "MMNet",
  148.         redis: getRedisConfig(redisURL)
  149.       });
  150.     }
  151.   }
  152.   app.use(
  153.     "/",
  154.     Arena(
  155.       {
  156.         queues: QueuesToMonitor
  157.       },
  158.       {
  159.         basePath: "/arena",
  160.         disableListen: true,
  161.         useCdn: false
  162.       }
  163.     )
  164.   );
  165. }
  166.  
  167. // helper
  168. /**
  169.  * @param {string} redisUrl
  170.  */
  171. function getRedisConfig(redisUrl) {
  172.   const redisConfig = url.parse(redisUrl);
  173.   return {
  174.     host: redisConfig.hostname || "localhost",
  175.     port: Number(redisConfig.port || 6379),
  176.     database: (redisConfig.pathname || "/0").substr(1) || "0",
  177.     password: redisConfig.auth ? redisConfig.auth.split(":")[1] : undefined
  178.   };
  179. }
  180.  
  181. export var GetAllQueues = function() {
  182.   return AllQueues;
  183. };
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top