Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- "use strict";
- import Queue from "bull";
- import Connection from "./DB/mongoDB";
- import path from "path";
- import url from "url";
- import Arena from "bull-arena";
- import logger from "./Helpers/logger";
- //{qName:{Q: Bull:Queue, status:[Paused,Running,Disabled...] }}
- const AllQueues = {};
- export var InitiateJobs = async function(app, redisURL, mongoDBName) {
- await Connection.connectToMongo();
- let db = Connection.client.db(mongoDBName);
- const collection = db.collection("SysJobType");
- let jobs = await collection.find().toArray();
- jobs.forEach(job => {
- if (job.IsEnabled === false) {
- //don't create a queue for the disabled job but catch to show it in UI
- AllQueues[job.name] = {};
- AllQueues[job.name].status = "Disabled";
- } else {
- let queue = createJobQueue(
- job.name,
- redisURL,
- job.queueOptions,
- job.autoGenerate,
- job.JobData,
- job.JobOptions,
- job.parallelJobCount,
- `${path.join(__dirname, `${job.processor}`)}`
- );
- }
- });
- //---
- setupArena(app, redisURL);
- };
- //create a queue for each
- /**
- * @param {string} name
- * @param {string} redisURL
- * @param {Queue.QueueOptions} queueOptions
- * @param {boolean} autoGenerate
- * @param {any} jobData
- * @param {Queue.JobOptions} jobOptions
- * @param {number} parallelJobCount
- * @param {string | ((job: Queue.Job<any>, done: Queue.DoneCallback) => void) | ((job: Queue.Job<any>) => Promise<any>)} processor
- */
- function createJobQueue(
- name,
- redisURL,
- queueOptions,
- autoGenerate,
- jobData,
- jobOptions,
- parallelJobCount,
- processor
- ) {
- try {
- logger.info(
- `adding job ${JSON.stringify({
- name,
- redisURL,
- queueOptions,
- autoGenerate,
- jobData,
- jobOptions,
- parallelJobCount,
- processor
- })}`
- );
- AllQueues[name] = {};
- AllQueues[name].Q = new Queue(name, redisURL, {
- ...queueOptions,
- redis: { showFriendlyErrorStack: true }
- });
- //a job can be auto generated like repeatable jobs
- if (autoGenerate === true) {
- AllQueues[name].Q.add(jobData, jobOptions);
- }
- //---add processor for the job
- AllQueues[name].Q.process(+parallelJobCount, processor);
- //--set the initial status
- AllQueues[name].status = "Running";
- //----
- AllQueues[name].Q.on("failed", (job, err) => {
- logger.error(`job ${JSON.stringify(job)} failed details: %o`, err);
- });
- AllQueues[name].Q.on("error", err => {
- logger.error(`uncaught error in queue %o`, err);
- });
- AllQueues[name].Q.on("paused", () => {
- AllQueues[name].status = "Paused";
- logger.info(`queue ${name} is paused`);
- });
- AllQueues[name].Q.on("resumed", () => {
- AllQueues[name].status = "Running";
- logger.info(`queue ${name} is resumed`);
- });
- return AllQueues[name].Q;
- } catch (error) {
- logger.error("failed to add job %o", error);
- return undefined;
- }
- }
- export var pauseQueue = async function(queueName) {
- try {
- let result = await AllQueues[queueName].Q.pause(false);
- logger.info(result);
- return result;
- } catch (error) {
- logger.error("failed to pause queue %o", error);
- return undefined;
- }
- };
- export var resumeQueue = async function(queueName) {
- try {
- let result = await AllQueues[queueName].Q.resume();
- logger.info(result);
- return result;
- } catch (error) {
- logger.error("failed to resume queue %o", error);
- return undefined;
- }
- };
- export var getJobs = async function(queueName, status, start, end) {
- logger.debug("%o", { queueName, status, start, end });
- start = start || 0;
- end = end || -1;
- //capitalize first char
- status = status.charAt(0).toUpperCase() + status.slice(1);
- let funcName = `get${status}`;
- let jobs = await AllQueues[queueName].Q[funcName](start, end);
- return jobs;
- };
- function setupArena(app, redisURL) {
- const QueuesToMonitor = [];
- for (let qName in AllQueues) {
- if (AllQueues[qName].status && AllQueues[qName].status !== "Disabled") {
- QueuesToMonitor.push({
- name: qName,
- hostId: "MMNet",
- redis: getRedisConfig(redisURL)
- });
- }
- }
- app.use(
- "/",
- Arena(
- {
- queues: QueuesToMonitor
- },
- {
- basePath: "/arena",
- disableListen: true,
- useCdn: false
- }
- )
- );
- }
- // helper
- /**
- * @param {string} redisUrl
- */
- function getRedisConfig(redisUrl) {
- const redisConfig = url.parse(redisUrl);
- return {
- host: redisConfig.hostname || "localhost",
- port: Number(redisConfig.port || 6379),
- database: (redisConfig.pathname || "/0").substr(1) || "0",
- password: redisConfig.auth ? redisConfig.auth.split(":")[1] : undefined
- };
- }
- export var GetAllQueues = function() {
- return AllQueues;
- };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement