Advertisement
Guest User

Untitled

a guest
May 6th, 2017
345
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /*eslint-env node */
  2. 'use strict';
  3.  
  4. var redis = require('ioredis');
  5. var EventEmitter = require('events');
  6.  
  7. var util = require('util');
  8. var url = require('url');
  9. var Job = require('./job');
  10. var scripts = require('./scripts');
  11. var errors = require('./errors');
  12.  
  13. var TimerManager = require('./timer-manager');
  14. var _ = require('lodash');
  15. var Promise = require('bluebird');
  16. var semver = require('semver');
  17. var debuglog = require('debuglog')('bull');
  18. var uuid = require('uuid');
  19.  
  20. var commands = require('./commands/');
  21.  
  22. /**
  23.   Gets or creates a new Queue with the given name.
  24.  
  25.   The Queue keeps 6 data structures:
  26.     - wait (list)
  27.     - active (list)
  28.     - delayed (zset)
  29.     - priority (zset)
  30.     - completed (set)
  31.     - failed (set)
  32.  
  33.         --> priorities      -- >completed
  34.        /     |            /
  35.     job -> wait -> active
  36.         |    ^            \
  37.         v    |             -- > failed
  38.         delayed
  39. */
  40.  
  41. /**
  42.   Delayed jobs are jobs that cannot be executed until a certain time in
  43.   ms has passed since they were added to the queue.
  44.   The mechanism is simple, a delayedTimestamp variable holds the next
  45.   known timestamp that is on the delayed set (or MAX_TIMEOUT_MS if none).
  46.  
  47.   When the current job has finalized the variable is checked, if
  48.   no delayed job has to be executed yet a setTimeout is set so that a
  49.   delayed job is processed after timing out.
  50. */
  51. var MINIMUM_REDIS_VERSION = '2.8.11';
  52.  
  53. var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed
  54.  
  55. /*
  56.   interface QueueOptions {
  57.     prefix?: string = 'bull',
  58.     redis : RedisOpts, // ioredis defaults
  59.     createClient?: (type: enum('client', 'subscriber'), redisOpts?: RedisOpts) => redisClient,
  60.  
  61.     // Advanced settings
  62.     settings?: QueueSettings {
  63.       lockDuration?: number = 30000,
  64.       lockRenewTime?: number = lockDuration / 2,
  65.       stalledInterval?: number = 30000,
  66.       maxStalledCount?: number = 1, // The maximum number of times a job can be recovered from the 'stalled' state
  67.       guardInterval?: number = 5000,
  68.       retryProcessDelay?: number = 5000
  69.     }
  70.   }
  71. */
  72.  
  73. // Queue(name: string, url?, opts?)
  74. var Queue = function Queue(name, url, opts){
  75.   var _this = this;
  76.   if(!(this instanceof Queue)){
  77.     return new Queue(name, url, opts);
  78.   }
  79.  
  80.   if(_.isString(url)){
  81.     opts = _.defaults(redisOptsFromUrl, opts);
  82.   }else{
  83.     opts = url;
  84.   }
  85.  
  86.   opts = opts || {};
  87.  
  88.   if(opts && !_.isObject(opts)){
  89.     throw Error('Options must be a valid object');
  90.   }
  91.  
  92.   var redisOpts = opts.redis || {};
  93.  
  94.   _.defaults(redisOpts, {
  95.     port: 6379,
  96.     host: '127.0.0.1',
  97.     db: redisOpts.db || redisOpts.DB,
  98.     retryStrategy: function (times) {
  99.       var delay = Math.min(Math.exp(times), 20000);
  100.       return delay;
  101.     }
  102.   });
  103.  
  104.   function createClient(type, redisOpts) {
  105.     var client;
  106.     if(_.isFunction(opts.createClient)){
  107.       client = opts.createClient(type, redisOpts);
  108.     }else{
  109.       client = new redis(redisOpts);
  110.     }
  111.     return client;
  112.   }
  113.  
  114.   this.name = name;
  115.   this.keyPrefix = redisOpts.keyPrefix || opts.prefix || 'bull';
  116.   this.token = uuid();
  117.  
  118.   //
  119.   // We cannot use ioredis keyPrefix feature since we
  120.   // create keys dynamically in lua scripts.
  121.   //
  122.   delete redisOpts.keyPrefix;
  123.  
  124.   //
  125.   // Create queue client (used to add jobs, pause queues, etc);
  126.   //
  127.   this.client = createClient('client', redisOpts);
  128.  
  129.   getRedisVersion(this.client).then(function(version){
  130.     if(semver.lt(version, MINIMUM_REDIS_VERSION)){
  131.       throw new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version);
  132.     }
  133.   }).catch(function(err){
  134.     _this.emit('error', err);
  135.   });
  136.  
  137.   //
  138.   // Create event subscriber client (receive messages from other instance of the queue)
  139.   //
  140.   this.eclient = createClient('subscriber', redisOpts);
  141.  
  142.   this.handlers = {};
  143.   this.delayTimer = null;
  144.   this.processing = [];
  145.   this.retrieving = 0;
  146.  
  147.   this.settings = _.defaults(opts.settings, {
  148.     lockDuration: 30000,
  149.     stalledInterval: 30000,
  150.     maxStalledCount: 1,
  151.     guardInterval: 5000,
  152.     retryProcessDelay: 5000
  153.   });
  154.  
  155.   this.settings.lockRenewTime = this.settings.lockRenewTime || this.settings.lockDuration / 2;
  156.  
  157.   // bubble up Redis error events
  158.   [this.client, this.eclient].forEach(function (client) {
  159.     client.on('error', _this.emit.bind(_this, 'error'));
  160.   });
  161.  
  162.   this.on('error', function(){
  163.     // Dummy handler to avoid process to exit with an unhandled exception.
  164.   });
  165.    
  166.   // keeps track of active timers. used by close() to
  167.   // ensure that disconnect() is deferred until all
  168.   // scheduled redis commands have been executed
  169.   this.timers = new TimerManager();
  170.  
  171.   //
  172.   // Init
  173.   //
  174.   this._init(name);
  175.  
  176.   //
  177.   // Only setup listeners if .on/.addEventListener called, or process function defined.
  178.   //
  179.   this._setupQueueEventListeners();
  180.  
  181.   this.delayedTimestamp = Number.MAX_VALUE;
  182.   this.isReady().then(function(){
  183.     // TODO: These are only useful if a process function has been defined.
  184.  
  185.     //
  186.     // Init delay timestamp.
  187.     //
  188.     scripts.updateDelaySet(_this, Date.now()).then(function(timestamp){
  189.       if(timestamp){
  190.         _this.updateDelayTimer(timestamp);
  191.       }
  192.     });
  193.  
  194.     //
  195.     // Create a guardian timer to revive delayTimer if necessary
  196.     // This is necessary when redis connection is unstable, which can cause the pub/sub to fail
  197.     //
  198.     _this.guardianTimer = setGuardianTimer(_this);
  199.   });
  200.  
  201.   this.errorRetryTimer = {};
  202.  
  203.   // Bind these methods to avoid constant rebinding and/or creating closures
  204.   // in processJobs etc.
  205.   this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this);
  206.   this.processJob = this.processJob.bind(this);
  207.   this.getJobFromId = Job.fromId.bind(null, this);
  208. };
  209.  
  210. function redisOptsFromUrl(urlString){
  211.   var redisOpts = {};
  212.   try {
  213.     var redisUrl = url.parse(urlString);
  214.     redisOpts.port = redisUrl.port;
  215.     redisOpts.host = redisUrl.hostname;
  216.     if (redisUrl.auth) {
  217.       redisOpts.password = redisUrl.auth.split(':')[1];
  218.     }
  219.   } catch (e) {
  220.     throw new Error(e.message);
  221.   }
  222.   return redisOpts;
  223. }
  224.  
  225. function setGuardianTimer(queue){
  226.   return setInterval(function() {
  227.     if(queue.delayedTimestamp < Date.now() || queue.delayedTimestamp - Date.now() > queue.settings.guardInterval){
  228.       scripts.updateDelaySet(queue, Date.now()).then(function(timestamp){
  229.         if(timestamp){
  230.           queue.updateDelayTimer(timestamp);
  231.         }
  232.       }).catch(function(err){
  233.         queue.emit('error', err);
  234.       });
  235.     }
  236.     //
  237.     // Trigger a getNextJob (if worker is idling)
  238.     //
  239.     queue.emit('added');
  240.   }, queue.settings.guardInterval);
  241. }
  242.  
  243. util.inherits(Queue, EventEmitter);
  244.  
  245. Queue.prototype.off = Queue.prototype.removeListener;
  246.  
  247. Queue.prototype._init = function(name){
  248.   var _this = this;
  249.   var initializers = [this.client, this.eclient].map(function (client) {
  250.     var _resolve, errorHandler;
  251.     return new Promise(function(resolve, reject) {
  252.       _resolve = resolve;
  253.       errorHandler = function(err){
  254.         if(err.code !== 'ECONNREFUSED'){
  255.           reject(err);
  256.         }
  257.       };
  258.       client.once('ready', resolve);
  259.       client.on('error', errorHandler);
  260.     }).finally(function(){
  261.       client.removeListener('ready', _resolve);
  262.       client.removeListener('error', errorHandler);
  263.     });
  264.   });
  265.  
  266.   this._initializing = Promise.all(initializers).then(function(){
  267.     return _this.eclient.psubscribe(_this.toKey('') + '*');
  268.   }).then(function(){
  269.     return commands(_this.client);
  270.   }).then(function(){
  271.     debuglog(name + ' queue ready');
  272.   }, function(err){
  273.     _this.emit('error', err, 'Error initializing queue');
  274.     throw err;
  275.   });
  276. };
  277.  
  278. Queue.prototype._setupQueueEventListeners = function(){
  279.   /*
  280.     if(eventName !== 'cleaned' && eventName !== 'error'){
  281.       args[0] = Job.fromJSON(_this, args[0]);
  282.     }
  283.   */
  284.   var _this = this;
  285.   var activeKey = _this.toKey('active');
  286.   var progressKey = _this.toKey('progress');
  287.   var delayedKey = _this.toKey('delayed');
  288.   var pausedKey = _this.toKey('paused');
  289.   var resumedKey = _this.toKey('resumed');
  290.   var addedKey = _this.toKey('added');
  291.   var completedKey = _this.toKey('completed');
  292.   var failedKey = _this.toKey('failed');
  293.  
  294.   this.eclient.on('pmessage', function(channel, pattern, message){
  295.     var keyAndToken = pattern.split('@');
  296.     var key = keyAndToken[0];
  297.     var token = keyAndToken[1];
  298.  
  299.     switch(key){
  300.       case activeKey:
  301.         _this.emit('global:active', message, 'waiting');
  302.         break;
  303.       case progressKey:
  304.         var jobAndProgress = message.split(':');
  305.         _this.emit('global:progress', jobAndProgress[0], jobAndProgress[1]);
  306.         break;
  307.       case delayedKey:
  308.         _this.updateDelayTimer(message);
  309.         break;
  310.       case pausedKey:
  311.       case resumedKey:
  312.         _this.emit('global:' + message);
  313.         break;
  314.       case addedKey:
  315.         _this.emit('added', message);
  316.         if(_this.token === token){
  317.           _this.emit('waiting', message, null);
  318.         }
  319.         token && _this.emit('global:waiting', message, null);
  320.         break;
  321.       case completedKey:
  322.         var data = JSON.parse(message);
  323.         var job = Job.fromJSON(_this, data.job);
  324.         _this.emit('global:completed', job, data.val, 'active');
  325.         break;
  326.       case failedKey:
  327.         var data = JSON.parse(message);
  328.         var job = Job.fromJSON(_this, data.job);
  329.         _this.emit('global:failed', job, data.val, 'active');
  330.         break;
  331.     }
  332.   });
  333. };
  334.  
  335. Queue.ErrorMessages = errors.Messages;
  336.  
  337. Queue.prototype.isReady = function(){
  338.   var _this = this;
  339.   return this._initializing.then(function(){
  340.     return _this;
  341.   });
  342. };
  343.  
  344. Queue.prototype.whenCurrentMoveFinished = function(){
  345.   var currentMove = this.client.commandQueue.peekFront();
  346.   return currentMove && currentMove.command.promise || Promise.resolve();
  347. };
  348.  
  349. Queue.prototype.disconnect = function(){
  350.   var clients = [this.client, this.eclient].filter(function(client){
  351.     return client.status === 'ready';
  352.   });
  353.  
  354.   var ended = new Promise(function(resolve){
  355.     var resolver = _.after(clients.length, resolve);
  356.     clients.forEach(function(client){
  357.       client.once('end', resolver);
  358.     });
  359.   });
  360.   return Promise.all(clients.map(function(client){
  361.     return client.quit();
  362.   })).then(function(){
  363.     return ended;
  364.   });
  365. };
  366.  
  367. Queue.prototype.close = function( doNotWaitJobs ){
  368.   var _this = this;
  369.  
  370.   if(this.closing){
  371.     return this.closing;
  372.   }
  373.  
  374.   return this.closing = this.isReady().then(function(){
  375.     _.each(_this.errorRetryTimer, function(timer){
  376.       clearTimeout(timer);
  377.     });
  378.     clearTimeout(_this.delayTimer);
  379.     clearInterval(_this.guardianTimer);
  380.     clearInterval(_this.moveUnlockedJobsToWaitInterval);
  381.     _this.timers.clearAll();
  382.     return _this.timers.whenIdle().then(function(){
  383.       return _this.pause(true, doNotWaitJobs);
  384.     }).then(function(){
  385.       return _this.disconnect();
  386.     }).then(function(){
  387.       _this.closed = true;
  388.     });
  389.   });
  390. };
  391.  
  392. /**
  393.   Processes a job from the queue. The callback is called for every job that
  394.   is dequeued.
  395.  
  396.   Deprecate in favor of:
  397.  
  398.   /*
  399.   queue.work('export', opts, function(job, input){
  400.  
  401.     return output;
  402.   }, 'adrapid-export-results');
  403.  
  404.   @method process
  405. */
  406. Queue.prototype.process = function(name, concurrency, handler){
  407.   if(typeof name !== 'string'){
  408.     handler = concurrency;
  409.     concurrency = name;
  410.     name = Job.DEFAULT_JOB_NAME;
  411.   }
  412.  
  413.   if(typeof concurrency === 'function'){
  414.     handler = concurrency;
  415.     concurrency = 1;
  416.   }
  417.  
  418.   this.setHandler(name, handler);
  419.  
  420.   var _this = this;
  421.   return this.isReady().then(function(){
  422.     return _this.start(concurrency);
  423.   });
  424. };
  425.  
  426. Queue.prototype.start = function(concurrency){
  427.   var _this = this;
  428.   return this.run(concurrency).catch(function(err){
  429.     _this.emit('error', err, 'error running queue');
  430.     throw err;
  431.   });
  432. };
  433.  
  434. Queue.prototype.setHandler = function(name, handler){
  435.   if(this.handlers[name]) {
  436.     throw new Error('Cannot define the same handler twice ' + name);
  437.   }
  438.  
  439.   handler = handler.bind(this);
  440.  
  441.   if(handler.length > 1){
  442.     this.handlers[name] = Promise.promisify(handler);
  443.   }else{
  444.     this.handlers[name] = Promise.method(handler);
  445.   }
  446. };
  447.  
  448. /**
  449. interface JobOptions
  450. {
  451.   attempts: number;
  452. }
  453. */
  454.  
  455. /**
  456.   Adds a job to the queue.
  457.   @method add
  458.   @param data: {} Custom data to store for this job. Should be JSON serializable.
  459.   @param opts: JobOptions Options for this job.
  460. */
  461. Queue.prototype.add = function(name, data, opts){
  462.   return Job.create(this, name, data, opts);
  463. };
  464.  
  465. /**
  466.   Returns the number of jobs waiting to be processed.
  467. */
  468. Queue.prototype.count = function(){
  469.   var multi = this.multi();
  470.   multi.llen(this.toKey('wait'));
  471.   multi.llen(this.toKey('paused'));
  472.   multi.zcard(this.toKey('delayed'));
  473.  
  474.   return multi.exec().then(function(res){
  475.     return Math.max(res[0][1], res[1][1]) + res[2][1];
  476.   });
  477. };
  478.  
  479. /**
  480.   Empties the queue.
  481.  
  482.   Returns a promise that is resolved after the operation has been completed.
  483.   Note that if some other process is adding jobs at the same time as emptying,
  484.   the queues may not be really empty after this method has executed completely.
  485.   Also, if the method does error between emptying the lists and removing all the
  486.   jobs, there will be zombie jobs left in redis.
  487.  
  488.   TODO: Use EVAL to make this operation fully atomic.
  489. */
  490. Queue.prototype.empty = function(){
  491.   var _this = this;
  492.  
  493.   // Get all jobids and empty all lists atomically.
  494.   var multi = this.multi();
  495.  
  496.   multi.lrange(this.toKey('wait'), 0, -1);
  497.   multi.lrange(this.toKey('paused'), 0, -1);
  498.   multi.del(this.toKey('wait'));
  499.   multi.del(this.toKey('paused'));
  500.   multi.del(this.toKey('meta-paused'));
  501.   multi.del(this.toKey('delayed'));
  502.  
  503.   return multi.exec().spread(function(waiting, paused){
  504.     waiting = waiting[1];
  505.     paused = paused[1];
  506.     var jobKeys = (paused.concat(waiting)).map(_this.toKey, _this);
  507.  
  508.     if(jobKeys.length){
  509.       multi = _this.multi();
  510.  
  511.       multi.del.apply(multi, jobKeys);
  512.       return multi.exec();
  513.     }
  514.   });
  515. };
  516.  
  517. /**
  518.   Pauses the processing of this queue, locally if true passed, otherwise globally.
  519.  
  520.   For global pause, we use an atomic RENAME operation on the wait queue. Since
  521.   we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue
  522.   is renamed to 'paused', no new jobs will be processed (the current ones
  523.   will run until finalized).
  524.  
  525.   Adding jobs requires a LUA script to check first if the paused list exist
  526.   and in that case it will add it there instead of the wait list.
  527. */
  528. Queue.prototype.pause = function(isLocal, doNotWaitActive){
  529.   var _this = this;
  530.   return _this.isReady().then(function(){
  531.     if(isLocal){
  532.       if(!_this.paused){
  533.         _this.paused = new Promise(function(resolve) {
  534.           _this.resumeLocal = function() {
  535.             resolve();
  536.             _this.paused = null; // Allow pause to be checked externally for paused state.
  537.           };
  538.         });
  539.       }
  540.       return !doNotWaitActive && _this.whenCurrentJobsFinished();
  541.     }else{
  542.       return scripts.pause(_this, true);
  543.     }
  544.   }).then(function(){
  545.     _this.emit('paused');
  546.   });
  547. };
  548.  
  549. Queue.prototype.resume = function(isLocal /* Optional */){
  550.   var _this = this;
  551.   return this.isReady().then(function(){
  552.     if(isLocal){
  553.       if(_this.resumeLocal){
  554.         _this.resumeLocal();
  555.       }
  556.     }else{
  557.       return scripts.pause(_this, false);
  558.     }
  559.   }).then(function(){
  560.     _this.emit('resumed');
  561.   });
  562. };
  563.  
  564. Queue.prototype.run = function(concurrency){
  565.   var promises = [];
  566.   var _this = this;
  567.  
  568.   return this.moveUnlockedJobsToWait().then(function(){
  569.     while(concurrency--){
  570.       promises.push(new Promise(function(resolve){
  571.         _this.processJobs(concurrency, resolve);
  572.       }));
  573.     }
  574.  
  575.     _this.startMoveUnlockedJobsToWait();
  576.  
  577.     return Promise.all(promises);
  578.   });
  579. };
  580.  
  581. // ---------------------------------------------------------------------
  582. // Private methods
  583. // ---------------------------------------------------------------------
  584.  
  585. /**
  586.   This function updates the delay timer, which is a timer that timeouts
  587.   at the next known delayed job.
  588. */
  589. Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){
  590.   var _this = this;
  591.   if(newDelayedTimestamp < _this.delayedTimestamp && newDelayedTimestamp < (MAX_TIMEOUT_MS + Date.now())){
  592.     clearTimeout(this.delayTimer);
  593.     this.delayedTimestamp = newDelayedTimestamp;
  594.  
  595.     var nextDelayedJob = newDelayedTimestamp - Date.now();
  596.     nextDelayedJob = nextDelayedJob < 0 ? 0 : nextDelayedJob;
  597.  
  598.     this.delayTimer = setTimeout(function(){
  599.       scripts.updateDelaySet(_this, _this.delayedTimestamp).then(function(nextTimestamp){
  600.         if(nextTimestamp){
  601.           nextTimestamp = nextTimestamp < Date.now() ? Date.now() : nextTimestamp;
  602.         }else{
  603.           nextTimestamp = Number.MAX_VALUE;
  604.         }
  605.         _this.updateDelayTimer(nextTimestamp);
  606.       }).catch(function(err){
  607.         _this.emit('error', err, 'Error updating the delay timer');
  608.       });
  609.       _this.delayedTimestamp = Number.MAX_VALUE;
  610.     }, nextDelayedJob);
  611.   }
  612. };
  613.  
  614. /**
  615.  * Process jobs that have been added to the active list but are not being
  616.  * processed properly. This can happen due to a process crash in the middle
  617.  * of processing a job, leaving it in 'active' but without a job lock.
  618. */
  619. Queue.prototype.moveUnlockedJobsToWait = function(){
  620.   var _this = this;
  621.  
  622.   if(this.closed){
  623.     return Promise.resolve();
  624.   }
  625.  
  626.   return scripts.moveUnlockedJobsToWait(this).then(function(responses){
  627.     var handleFailedJobs = responses[0].map(function(jobId){
  628.       return _this.getJobFromId(jobId).then(function(job){
  629.         _this.emit('failed', job, new Error('job stalled more than allowable limit'), 'active' );
  630.         return null;
  631.       });
  632.     });
  633.     var handleStalledJobs = responses[1].map(function(jobId){
  634.       return _this.getJobFromId(jobId).then(function(job){
  635.         _this.emit('stalled', job);
  636.         return null;
  637.       });
  638.     });
  639.     return Promise.all(handleFailedJobs.concat(handleStalledJobs));
  640.   }).catch(function(err){
  641.     _this.emit('error', err, 'Failed to handle unlocked job in active');
  642.   });
  643. };
  644.  
  645. Queue.prototype.startMoveUnlockedJobsToWait = function() {
  646.   clearInterval(this.moveUnlockedJobsToWaitInterval);
  647.   if (this.settings.stalledInterval > 0){
  648.     this.moveUnlockedJobsToWaitInterval =
  649.       setInterval(this.moveUnlockedJobsToWait, this.settings.stalledInterval);
  650.   }
  651. };
  652.  
  653. Queue.prototype.processJobs = function(index, resolve){
  654.   var _this = this;
  655.   var processJobs = this.processJobs.bind(this, index, resolve);
  656.   process.nextTick(function(){
  657.     if(!_this.closing){
  658.       (_this.paused || Promise.resolve()).then(function(){
  659.         return _this.processing[index] = _this.getNextJob()
  660.           .then(_this.processJob)
  661.           .then(processJobs, function(/*err*/){
  662.             //
  663.             // Wait before trying to process again.
  664.             //
  665.             clearTimeout(_this.errorRetryTimer[index]);
  666.             _this.errorRetryTimer[index] = setTimeout(function(){
  667.               processJobs();
  668.             }, _this.settings.retryProcessDelay);
  669.           });
  670.       }).catch(function(err){
  671.         _this.emit('error', err, 'Error processing job');
  672.       });
  673.     }else{
  674.       resolve(_this.closing);
  675.     }
  676.   });
  677. };
  678.  
  679. Queue.prototype.processJob = function(job){
  680.   var _this = this;
  681.   var lockRenewId;
  682.   var timerStopped = false;
  683.  
  684.   if(!job){
  685.     return Promise.resolve();
  686.   }
  687.  
  688.   //
  689.   // There are two cases to take into consideration regarding locks.
  690.   // 1) The lock renewer fails to renew a lock, this should make this job
  691.   // unable to complete, since some other worker is also working on it.
  692.   // 2) The lock renewer is called more seldom than the check for stalled
  693.   // jobs, so we can assume the job has been stalled and is already being processed
  694.   // by another worker. See #308
  695.   //
  696.   var lockExtender = function(){
  697.     lockRenewId = _this.timers.set('lockExtender', _this.settings.lockRenewTime, function(){
  698.       scripts.extendLock(_this, job.id).then(function(lock){
  699.         if(lock && !timerStopped){
  700.           lockExtender();
  701.         }
  702.       }).catch(function(/*err*/){
  703.         // Somehow tell the worker this job should stop processing...
  704.       });
  705.     });
  706.   };
  707.  
  708.   var timeoutMs = job.opts.timeout;
  709.  
  710.   function stopTimer(){
  711.     timerStopped = true;
  712.     _this.timers.clear(lockRenewId);
  713.   }
  714.  
  715.   function handleCompleted(result){
  716.     try{
  717.       JSON.stringify(result);
  718.     }catch(err){
  719.       return handleFailed(err);
  720.     }
  721.  
  722.     return job.moveToCompleted(result).then(function(){
  723.       _this.emit('completed', job, result, 'active');
  724.     });
  725.   }
  726.  
  727.   function handleFailed(err){
  728.     var error = err.cause || err; //Handle explicit rejection
  729.  
  730.     // See https://github.com/OptimalBits/bull/pull/415#issuecomment-269744735
  731.     return job.moveToFailed(err).then(function(){
  732.       _this.emit('failed', job, error, 'active');
  733.     });
  734.   }
  735.  
  736.   lockExtender();
  737.   var handler = _this.handlers[job.name];
  738.   if(!handler){
  739.     return handleFailed(Error('Missing process handler for job type ' + job.name));
  740.   }else{
  741.     var jobPromise = handler(job);
  742.  
  743.     if(timeoutMs){
  744.       jobPromise = jobPromise.timeout(timeoutMs);
  745.     }
  746.  
  747.     // Local event with jobPromise so that we can cancel job.
  748.     // Probably we could have better ways to do this...
  749.     // For example, listen to a global event 'cancel'
  750.     _this.emit('active', job, jobPromise, 'waiting');
  751.  
  752.     return jobPromise.then(handleCompleted, handleFailed).finally(function(){
  753.       stopTimer();
  754.     });
  755.   }
  756. };
  757.  
  758. Queue.prototype.multi = function(){
  759.   return this.client.multi();
  760. };
  761.  
  762. /**
  763.   Returns a promise that resolves to the next job in queue.
  764. */
  765. Queue.prototype.getNextJob = function() {
  766.   var _this = this;
  767.  
  768.   if(this.closing){
  769.     return Promise.resolve();
  770.   }
  771.  
  772.   //
  773.   // Listen for new jobs, during moveToActive or after.
  774.   //
  775.   var resolve;
  776.   var newJobs = new Promise(function(_resolve){
  777.     // Needs to wrap to ignore the emitted value, or the promise will not resolve.
  778.     resolve = function(){
  779.       _resolve();
  780.     };
  781.     _this.on('added', resolve);
  782.     _this.on('global:resumed', resolve);
  783.     _this.on('wait-finished', resolve);
  784.   });
  785.  
  786.   var removeListeners = function(){
  787.     _this.removeListener('added', resolve);
  788.     _this.removeListener('global:resumed', resolve);
  789.     _this.removeListener('wait-finished', resolve);
  790.   };
  791.  
  792.   return scripts.moveToActive(this).spread(function(jobData, jobId){
  793.     if(jobData){
  794.       return Job.fromData(_this, jobData, jobId);
  795.     }else{
  796.       return newJobs;
  797.     }
  798.   }).finally(function(){
  799.     removeListeners();
  800.   });
  801. };
  802.  
  803. Queue.prototype.getJob = function(jobId){
  804.   return Job.fromId(this, jobId);
  805. };
  806.  
  807. // Job counts by type
  808. // Queue#getJobCountByTypes('completed') => completed count
  809. // Queue#getJobCountByTypes('completed,failed') => completed + failed count
  810. // Queue#getJobCountByTypes('completed', 'failed') => completed + failed count
  811. // Queue#getJobCountByTypes('completed,waiting', 'failed') => completed + waiting + failed count
  812. Queue.prototype.getJobCountByTypes = function() {
  813.   var _this = this;
  814.   var args = _.compact(Array.prototype.slice.call(arguments));
  815.   var types = _.compact(args.join(',').replace(/ /g, '').split(','));
  816.  
  817.   var multi = this.multi();
  818.  
  819.   _.each(types, function(type) {
  820.     var key = _this.toKey(type);
  821.     switch(type) {
  822.       case 'completed':
  823.       case 'failed':
  824.       case 'delayed':
  825.         multi.zcard(key);
  826.         break;
  827.       case 'active':
  828.       case 'wait':
  829.       case 'paused':
  830.         multi.llen(key);
  831.         break;
  832.     }
  833.   });
  834.  
  835.   return multi.exec().then(function(res){
  836.     return res.map(function(v) {
  837.       return v[1];
  838.     }).reduce(function(a, b) {
  839.       return a + b;
  840.     });      
  841.   }) || 0;
  842. };
  843.  
  844. /**
  845.  * Returns all the job counts for every list/set in the queue.
  846.  *
  847.  */
  848. Queue.prototype.getJobCounts = function(){
  849.   var types = ['waiting', 'active', 'completed', 'failed', 'delayed'];
  850.   var counts = {};
  851.   return this.client.multi()
  852.     .llen(this.toKey('wait'))
  853.     .llen(this.toKey('active'))
  854.     .zcard(this.toKey('completed'))
  855.     .zcard(this.toKey('failed'))
  856.     .zcard(this.toKey('delayed'))
  857.     .exec().then(function(result){
  858.       result.forEach(function(res, index){
  859.         counts[types[index]] = res[1] || 0;
  860.       });
  861.       return counts;
  862.     });
  863. };
  864.  
  865. Queue.prototype.getCompletedCount = function() {
  866.   return this.client.zcard(this.toKey('completed'));
  867. };
  868.  
  869. Queue.prototype.getFailedCount = function() {
  870.   return this.client.zcard(this.toKey('failed'));
  871. };
  872.  
  873. Queue.prototype.getDelayedCount = function() {
  874.   return this.client.zcard(this.toKey('delayed'));
  875. };
  876.  
  877. Queue.prototype.getActiveCount = function() {
  878.   return this.client.llen(this.toKey('active'));
  879. };
  880.  
  881. Queue.prototype.getWaitingCount = function() {
  882.   return this.client.llen(this.toKey('wait'));
  883. };
  884.  
  885. Queue.prototype.getPausedCount = function() {
  886.   return this.client.llen(this.toKey('paused'));
  887. };
  888.  
  889. Queue.prototype.getWaiting = function(start, end){
  890.   return Promise.join(
  891.     this.getJobs('wait', 'LIST', start, end),
  892.     this.getJobs('paused', 'LIST', start, end)).spread(function(waiting, paused){
  893.       return _.concat(waiting, paused);
  894.     });
  895. };
  896.  
  897. Queue.prototype.getActive = function(start, end){
  898.   return this.getJobs('active', 'LIST', start, end);
  899. };
  900.  
  901. Queue.prototype.getDelayed = function(start, end){
  902.   return this.getJobs('delayed', 'ZSET', start, end);
  903. };
  904.  
  905. Queue.prototype.getCompleted = function(start, end){
  906.   return this.getJobs('completed', 'ZSET', start, end);
  907. };
  908.  
  909. Queue.prototype.getFailed = function(start, end){
  910.   return this.getJobs('failed', 'ZSET', start, end);
  911. };
  912.  
  913. Queue.prototype.getJobs = function(queueType, type, start, end){
  914.   var _this = this;
  915.   var key = this.toKey(queueType);
  916.   var jobs;
  917.  
  918.   start = _.isUndefined(start) ? 0 : start;
  919.   end = _.isUndefined(end) ? -1 : end;
  920.  
  921.   switch(type){
  922.     case 'LIST':
  923.       jobs = this.client.lrange(key, start, end);
  924.       break;
  925.     case 'ZSET':
  926.       jobs = this.client.zrange(key, start, end);
  927.       break;
  928.   }
  929.  
  930.   return jobs.then(function(jobIds){
  931.     var jobsFromId = jobIds.map(_this.getJobFromId);
  932.     return Promise.all(jobsFromId);
  933.   });
  934. };
  935.  
  936. Queue.prototype.retryJob = function(job) {
  937.   return job.retry();
  938. };
  939.  
  940. Queue.prototype.toKey = function(queueType){
  941.   return [this.keyPrefix, this.name, queueType].join(':');
  942. };
  943.  
  944. /*@function clean
  945.  *
  946.  * Cleans jobs from a queue. Similar to remove but keeps jobs within a certian
  947.  * grace period.
  948.  *
  949.  * @param {int} grace - The grace period
  950.  * @param {string} [type=completed] - The type of job to clean. Possible values
  951.  * @param {int} The max number of jobs to clean
  952.  * are completed, waiting, active, delayed, failed. Defaults to completed.
  953.  */
  954. Queue.prototype.clean = function (grace, type, limit) {
  955.   var _this = this;
  956.  
  957.   if(grace === undefined || grace === null) {
  958.     return Promise.reject(new Error('You must define a grace period.'));
  959.   }
  960.  
  961.   if(!type) {
  962.     type = 'completed';
  963.   }
  964.  
  965.   if(_.indexOf([
  966.     'completed',
  967.     'wait',
  968.     'active',
  969.     'delayed',
  970.     'failed'], type) === -1){
  971.     return Promise.reject(new Error('Cannot clean unkown queue type'));
  972.   }
  973.  
  974.   return scripts.cleanJobsInSet(_this, type, Date.now() - grace, limit).then(function (jobs) {
  975.     _this.emit('cleaned', jobs, type);
  976.     return jobs;
  977.   }).catch(function (err) {
  978.     _this.emit('error', err);
  979.     throw err;
  980.   });
  981. };
  982.  
  983. /**
  984.  * Returns a promise that resolves when active jobs are cleared
  985.  *
  986.  * @returns {Promise}
  987.  */
  988. Queue.prototype.whenCurrentJobsFinished = function(){
  989.   var _this = this;
  990.  
  991.   _this.emit('wait-finished');
  992.   return new Promise(function(resolve){
  993.     Promise.all(_this.processing).finally(function(){
  994.       resolve();
  995.     });
  996.   });
  997. };
  998.  
  999. //
  1000. // Private local functions
  1001. //
  1002. var getRedisVersion = function getRedisVersion(client){
  1003.   return client.info().then(function(doc){
  1004.     var prefix = 'redis_version:';
  1005.     var lines = doc.split('\r\n');
  1006.     for(var i = 0; i < lines.length; i++){
  1007.       if(lines[i].indexOf(prefix) === 0){
  1008.         return lines[i].substr(prefix.length);
  1009.       }
  1010.     }
  1011.   });
  1012. };
  1013.  
  1014. module.exports = Queue;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement