Guest User

updated - resque

a guest
Oct 10th, 2016
37
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. var dnode = require('./node_modules/dnode');
  2. var _ = require('lodash');
  3. var schedule = require('node-schedule');
  4. var NR = require("node-resque");
  5. var promise = require('bluebird');
  6. var options = {
  7.     promiseLib: promise // override the default (es6 promise)
  8. };
  9. var pgp = require('./node_modules/pg-promise')(options);
  10. var cn = {
  11.     host: 'localhost',
  12.     port: 5432,
  13.     database: 'postgres',
  14.     user: 'postgres',
  15.     password: 'apassword'
  16. };
  17. var db = pgp(cn);
  18.  
  19.  
  20. /////////////////////////////
  21. // SET UP REDIS CONNECTION //
  22. /////////////////////////////
  23.  
  24. var connectionDetails = {
  25.  
  26.     pkg:       'ioredis',
  27.     host:      '127.0.0.1',
  28.     password:  null,
  29.     port:      6379,
  30.     database:  0,
  31.     // namespace: 'resque',
  32.     // looping: true,
  33.     // options: {password: 'abc'},
  34. };
  35.  
  36.  
  37. // var queue = new NR.queue({connection: connectionDetails}, jobs);
  38.  
  39. ///////////////////////
  40. // START A SCHEDULER //
  41. ///////////////////////
  42.  
  43. var scheduler = new NR.scheduler({connection: connectionDetails});
  44.     scheduler.connect(function(){
  45.         scheduler.start();
  46. });
  47.  
  48. scheduler.on('start',             function(){ console.log("scheduler started"); });
  49. scheduler.on('end',               function(){ console.log("scheduler ended"); });
  50. scheduler.on('poll',              function(){ console.log("scheduler polling"); });
  51. scheduler.on('master',            function(state){ console.log("scheduler became master"); });
  52. scheduler.on('error',             function(error){ console.log("scheduler error >> " + error); });
  53. scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); });
  54. scheduler.on('transferred_job',   function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); });
  55.  
  56.  
  57. var shutdown = function(){
  58.     scheduler.end(function(){
  59.         worker.end(function(){
  60.             console.log('bye.');
  61.             // db.query('UPDATE masteruser SET is_live = false')
  62.             // .then(function(data){
  63.             //  console.log('updated all entries in masteruser is_live to deactivated!');
  64.             process.exit();
  65.             // })
  66.         });
  67.     });
  68. };
  69.  
  70. process.on('SIGTERM', shutdown);
  71. process.on('SIGINT', shutdown);
  72.  
  73.  
  74. //////////////////
  75. // WORKER TASKS //
  76. //////////////////
  77.  
  78. var jobs = {
  79.     "followTask": {
  80.  
  81.         plugins: [ 'jobLock', 'retry' ],
  82.         pluginOptions: {
  83.           jobLock: {},
  84.           retry: {
  85.             retryLimit: 3,
  86.             retryDelay: (1000 * 5),
  87.           }
  88.         },
  89.  
  90.         perform: function(screen_name, oauthToken, oauthTokenSecret, callback){
  91.  
  92.             var gredis = new NR.gopiredis({connection: connectionDetails});
  93.                 gredis.connect(function(){
  94.                 gredis.popusertofollow(screen_name, function(usertofollow){
  95.                     dnode.connect(7070, function (remote, conn) {
  96.                         remote.postNewFollow(usertofollow, oauthToken, oauthTokenSecret, function (test) {
  97.  
  98.                             //TO DO ADD USER_ID
  99.                             db.query('INSERT INTO newfollows(screen_name, followed_screen_name, date_added, user_id) SELECT ${screen_name},${followed_screen_name},CURRENT_DATE,${user_id}', {                        
  100.                             screen_name: screen_name,
  101.                             followed_screen_name: usertofollow,
  102.                             user_id:test['id_str']
  103.                             })
  104.                             .then(function(data){
  105.                                 console.log('inserted into new follows table');
  106.                                 gredis.end(function(){console.log('gredis ended inside pop redis')})
  107.  
  108.                             })
  109.                         });
  110.                     });
  111.                 });
  112.             });
  113.             callback(null,screen_name);
  114.         },
  115.     },
  116.  
  117.     "populateRedis": {
  118.  
  119.         plugins: [ 'jobLock', 'retry' ],
  120.             pluginOptions: {
  121.               jobLock: {},
  122.               retry: {
  123.                 retryLimit: 3,
  124.                 retryDelay: (1000 * 5),
  125.               }
  126.             },     
  127.  
  128.         perform: function(screen_name, oauthToken, oauthTokenSecret, callback){
  129.             var dbcall1 = [];
  130.             var dbcall2 = [];
  131.             var differenced = [];
  132.             var sjob;
  133.  
  134.             db.any("select user_id from seeds where flag = 'true'")
  135.                     .then(function (data){    
  136.                         for(i=0; i<Object.keys(data).length; i++) {
  137.                             dbcall1.push(data[i]['user_id']);
  138.                         }
  139.                     })
  140.                     .catch(function (error) {console.log(error);});
  141.  
  142.             db.any("select user_id from newfollows where screen_name = ${sname}", {
  143.                 sname: screen_name
  144.             })
  145.                     .then(function (data){
  146.                         for(i=0; i<Object.keys(data).length; i++) {
  147.                             dbcall2.push(data[i]['user_id']);
  148.                         }
  149.                         differenced = _.difference(dbcall1,dbcall2);
  150.                        
  151.                         //ADD FOLLOWABLE USERS INTO A REDIS SET
  152.                         var gredis = new NR.gopiredis({connection: connectionDetails});
  153.                         gredis.connect(function(){
  154.                             gredis.adduserstofollow(screen_name, differenced);
  155.                         });
  156.                        
  157.                         //SCHEDULE      
  158.                         var sjob = schedule.scheduleJob(screen_name, '10,30,50 * * * * *', function(){
  159.                             var queue = new NR.queue({connection: connectionDetails}, jobs);
  160.                             queue.on('error', function(error){ console.log(error); });
  161.                             queue.connect(function(){  
  162.                                     if(scheduler.master){
  163.                                         console.log('>>> enquing scheduled job' + ':'+ screen_name);
  164.                                         queue.enqueue('nonpriority', "followTask", [screen_name, oauthToken, oauthTokenSecret]);
  165.                                         // queue.end(function(){console.log('queue connection closed')});
  166.                                     }
  167.                             });
  168.                         });
  169.                     })
  170.                     .catch(function (error) {console.log(error);});
  171.  
  172.             callback(null,screen_name);        
  173.         },
  174.     },
  175. };
  176.  
  177.  
  178. ///////////////////////////////
  179. // START ONE WORKER PROCESS ///
  180. ///////////////////////////////
  181.  
  182. var worker = new NR.worker({connection: connectionDetails, queues: ['priority', 'nonpriority']}, jobs);
  183. worker.connect(function(){
  184.     worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host
  185.     worker.start();
  186. });
  187.  
  188.  
  189. ///////////////////////////////
  190. // REGESTERING WORKER EVENTS //
  191. ///////////////////////////////
  192.  
  193. worker.on('start',           function(){ console.log("worker started"); });
  194. worker.on('end',             function(){ console.log("worker ended"); });
  195. worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); });
  196. worker.on('poll',            function(queue){ console.log("worker polling " + queue); });
  197. worker.on('job',             function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); });
  198. worker.on('reEnqueue',       function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); });
  199. worker.on('success',         function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); });
  200. worker.on('failure',         function(queue, job, failure){ console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); });
  201. worker.on('error',           function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); });
  202. worker.on('pause',           function(){ console.log("worker paused"); });
Add Comment
Please, Sign In to add comment