Advertisement
Guest User

Untitled

a guest
Apr 21st, 2019
100
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.70 KB | None | 0 0
  1. "use strict";
  2. import Queue from "bull";
  3. import Connection from "./DB/mongoDB";
  4. import path from "path";
  5. import url from "url";
  6. import Arena from "bull-arena";
  7. import logger from "./Helpers/logger";
  8.  
  9. //{qName:{Q: Bull:Queue, status:[Paused,Running,Disabled...] }}
  10. const AllQueues = {};
  11.  
  12. export var InitiateJobs = async function(app, redisURL, mongoDBName) {
  13. await Connection.connectToMongo();
  14. let db = Connection.client.db(mongoDBName);
  15. const collection = db.collection("SysJobType");
  16. let jobs = await collection.find().toArray();
  17. jobs.forEach(job => {
  18. if (job.IsEnabled === false) {
  19. //don't create a queue for the disabled job but catch to show it in UI
  20. AllQueues[job.name] = {};
  21. AllQueues[job.name].status = "Disabled";
  22. } else {
  23. let queue = createJobQueue(
  24. job.name,
  25. redisURL,
  26. job.queueOptions,
  27. job.autoGenerate,
  28. job.JobData,
  29. job.JobOptions,
  30. job.parallelJobCount,
  31. `${path.join(__dirname, `${job.processor}`)}`
  32. );
  33. }
  34. });
  35. //---
  36. setupArena(app, redisURL);
  37. };
  38.  
  39. //create a queue for each
  40. /**
  41. * @param {string} name
  42. * @param {string} redisURL
  43. * @param {Queue.QueueOptions} queueOptions
  44. * @param {boolean} autoGenerate
  45. * @param {any} jobData
  46. * @param {Queue.JobOptions} jobOptions
  47. * @param {number} parallelJobCount
  48. * @param {string | ((job: Queue.Job<any>, done: Queue.DoneCallback) => void) | ((job: Queue.Job<any>) => Promise<any>)} processor
  49. */
  50. function createJobQueue(
  51. name,
  52. redisURL,
  53. queueOptions,
  54. autoGenerate,
  55. jobData,
  56. jobOptions,
  57. parallelJobCount,
  58. processor
  59. ) {
  60. try {
  61. logger.info(
  62. `adding job ${JSON.stringify({
  63. name,
  64. redisURL,
  65. queueOptions,
  66. autoGenerate,
  67. jobData,
  68. jobOptions,
  69. parallelJobCount,
  70. processor
  71. })}`
  72. );
  73. AllQueues[name] = {};
  74. AllQueues[name].Q = new Queue(name, redisURL, {
  75. ...queueOptions,
  76. redis: { showFriendlyErrorStack: true }
  77. });
  78. //a job can be auto generated like repeatable jobs
  79. if (autoGenerate === true) {
  80. AllQueues[name].Q.add(jobData, jobOptions);
  81. }
  82. //---add processor for the job
  83. AllQueues[name].Q.process(+parallelJobCount, processor);
  84. //--set the initial status
  85. AllQueues[name].status = "Running";
  86. //----
  87. AllQueues[name].Q.on("failed", (job, err) => {
  88. logger.error(`job ${JSON.stringify(job)} failed details: %o`, err);
  89. });
  90. AllQueues[name].Q.on("error", err => {
  91. logger.error(`uncaught error in queue %o`, err);
  92. });
  93. AllQueues[name].Q.on("paused", () => {
  94. AllQueues[name].status = "Paused";
  95. logger.info(`queue ${name} is paused`);
  96. });
  97. AllQueues[name].Q.on("resumed", () => {
  98. AllQueues[name].status = "Running";
  99. logger.info(`queue ${name} is resumed`);
  100. });
  101. return AllQueues[name].Q;
  102. } catch (error) {
  103. logger.error("failed to add job %o", error);
  104. return undefined;
  105. }
  106. }
  107.  
  108. export var pauseQueue = async function(queueName) {
  109. try {
  110. let result = await AllQueues[queueName].Q.pause(false);
  111. logger.info(result);
  112. return result;
  113. } catch (error) {
  114. logger.error("failed to pause queue %o", error);
  115. return undefined;
  116. }
  117. };
  118.  
  119. export var resumeQueue = async function(queueName) {
  120. try {
  121. let result = await AllQueues[queueName].Q.resume();
  122. logger.info(result);
  123. return result;
  124. } catch (error) {
  125. logger.error("failed to resume queue %o", error);
  126. return undefined;
  127. }
  128. };
  129.  
  130. export var getJobs = async function(queueName, status, start, end) {
  131. logger.debug("%o", { queueName, status, start, end });
  132. start = start || 0;
  133. end = end || -1;
  134. //capitalize first char
  135. status = status.charAt(0).toUpperCase() + status.slice(1);
  136. let funcName = `get${status}`;
  137. let jobs = await AllQueues[queueName].Q[funcName](start, end);
  138. return jobs;
  139. };
  140.  
  141. function setupArena(app, redisURL) {
  142. const QueuesToMonitor = [];
  143. for (let qName in AllQueues) {
  144. if (AllQueues[qName].status && AllQueues[qName].status !== "Disabled") {
  145. QueuesToMonitor.push({
  146. name: qName,
  147. hostId: "MMNet",
  148. redis: getRedisConfig(redisURL)
  149. });
  150. }
  151. }
  152. app.use(
  153. "/",
  154. Arena(
  155. {
  156. queues: QueuesToMonitor
  157. },
  158. {
  159. basePath: "/arena",
  160. disableListen: true,
  161. useCdn: false
  162. }
  163. )
  164. );
  165. }
  166.  
  167. // helper
  168. /**
  169. * @param {string} redisUrl
  170. */
  171. function getRedisConfig(redisUrl) {
  172. const redisConfig = url.parse(redisUrl);
  173. return {
  174. host: redisConfig.hostname || "localhost",
  175. port: Number(redisConfig.port || 6379),
  176. database: (redisConfig.pathname || "/0").substr(1) || "0",
  177. password: redisConfig.auth ? redisConfig.auth.split(":")[1] : undefined
  178. };
  179. }
  180.  
  181. export var GetAllQueues = function() {
  182. return AllQueues;
  183. };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement