Advertisement
Guest User

Untitled

a guest
Jul 6th, 2015
160
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.14 KB | None | 0 0
  1. 'use strict'
  2. const kue = require('kue')
  3. const cluster = require('cluster')
  4. const fs = require('mz/fs')
  5. const co = require('co')
  6. const highland = require('highland')
  7. const log = require('npmlog')
  8.  
  9. if(cluster.isMaster) {
  10. // Inside master process, so create a queue and spin up some children
  11. const promise = co(function* () {
  12. const queue = yield createQueue()
  13. queue.on('job enqueue', function(id) {
  14. log.info('master', `job ${id} enqueued`)
  15. }).on('job complete', function(id) {
  16. log.info('master', `job ${id} finished`)
  17. }).on('job failed', function(id, error) {
  18. log.error('master', `job ${id} failed (${error})`)
  19. })
  20. // I might have been able to name this function better but I chose not to
  21. yield forkChildren()
  22. return yield createJob(queue, 'testJob', { 'foo': 'bar' })
  23. })
  24. highland(promise)
  25. .map(function(result) {
  26. console.log(result)
  27. })
  28. .errors(function(error) {
  29. console.error(error)
  30. })
  31. .resume()
  32. } else {
  33. // Inside child process, some intensive work is done here or whatever
  34. co(function* () {
  35. const queue = yield createQueue()
  36. queue.process('testJob', function(job, finished) {
  37. log.info(`child ${process.pid}`, `starting ${job.type}`)
  38. setTimeout(function() {
  39. log.info(`child ${process.pid}`, `finished ${job.type}`)
  40. finished(null, 'it worked!')
  41. }, 1000)
  42. })
  43. })
  44. }
  45.  
  46. function *forkChildren() {
  47. cluster.fork()
  48. }
  49.  
  50. function *createQueue() {
  51. const host = yield getRedisHost()
  52. return _createQueue(host)
  53. }
  54.  
  55. function *getRedisHost() {
  56. const buffer = yield fs.readFile('./etc/.redis')
  57. return buffer.toString()
  58. }
  59.  
  60. function _createQueue(redisHost) {
  61. // unfortunately kue doesn't support waiting for the client :(
  62. // instead we could create a client then supply it to kue
  63. return kue.createQueue({
  64. redis: {
  65. host: redisHost
  66. }
  67. })
  68. }
  69.  
  70. function createJob(queue, name, options) {
  71. return new Promise(function(resolve, reject) {
  72. var job = queue.create(name, options)
  73. job.on('complete', function(result) {
  74. resolve(result)
  75. })
  76. job.on('failed', function(errorMessage) {
  77. reject(errorMessage)
  78. })
  79. job.save()
  80. })
  81. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement