Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- 'use strict'
- const kue = require('kue')
- const cluster = require('cluster')
- const fs = require('mz/fs')
- const co = require('co')
- const highland = require('highland')
- const log = require('npmlog')
- if(cluster.isMaster) {
- // Inside master process, so create a queue and spin up some children
- const promise = co(function* () {
- const queue = yield createQueue()
- queue.on('job enqueue', function(id) {
- log.info('master', `job ${id} enqueued`)
- }).on('job complete', function(id) {
- log.info('master', `job ${id} finished`)
- }).on('job failed', function(id, error) {
- log.error('master', `job ${id} failed (${error})`)
- })
- // I might have been able to name this function better but I chose not to
- yield forkChildren()
- return yield createJob(queue, 'testJob', { 'foo': 'bar' })
- })
- highland(promise)
- .map(function(result) {
- console.log(result)
- })
- .errors(function(error) {
- console.error(error)
- })
- .resume()
- } else {
- // Inside child process, some intensive work is done here or whatever
- co(function* () {
- const queue = yield createQueue()
- queue.process('testJob', function(job, finished) {
- log.info(`child ${process.pid}`, `starting ${job.type}`)
- setTimeout(function() {
- log.info(`child ${process.pid}`, `finished ${job.type}`)
- finished(null, 'it worked!')
- }, 1000)
- })
- })
- }
- function *forkChildren() {
- cluster.fork()
- }
- function *createQueue() {
- const host = yield getRedisHost()
- return _createQueue(host)
- }
- function *getRedisHost() {
- const buffer = yield fs.readFile('./etc/.redis')
- return buffer.toString()
- }
- function _createQueue(redisHost) {
- // unfortunately kue doesn't support waiting for the client :(
- // instead we could create a client then supply it to kue
- return kue.createQueue({
- redis: {
- host: redisHost
- }
- })
- }
- function createJob(queue, name, options) {
- return new Promise(function(resolve, reject) {
- var job = queue.create(name, options)
- job.on('complete', function(result) {
- resolve(result)
- })
- job.on('failed', function(errorMessage) {
- reject(errorMessage)
- })
- job.save()
- })
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement