Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- var dnode = require('./node_modules/dnode');
- var _ = require('lodash');
- var schedule = require('node-schedule');
- var NR = require("node-resque");
- var promise = require('bluebird');
- var options = {
- promiseLib: promise // override the default (es6 promise)
- };
- var pgp = require('./node_modules/pg-promise')(options);
- var cn = {
- host: 'localhost',
- port: 5432,
- database: 'postgres',
- user: 'postgres',
- password: 'apassword'
- };
- var db = pgp(cn);
- /////////////////////////////
- // SET UP REDIS CONNECTION //
- /////////////////////////////
- var connectionDetails = {
- pkg: 'ioredis',
- host: '127.0.0.1',
- password: null,
- port: 6379,
- database: 0,
- // namespace: 'resque',
- // looping: true,
- // options: {password: 'abc'},
- };
- // var queue = new NR.queue({connection: connectionDetails}, jobs);
- ///////////////////////
- // START A SCHEDULER //
- ///////////////////////
- var scheduler = new NR.scheduler({connection: connectionDetails});
- scheduler.connect(function(){
- scheduler.start();
- });
- scheduler.on('start', function(){ console.log("scheduler started"); });
- scheduler.on('end', function(){ console.log("scheduler ended"); });
- scheduler.on('poll', function(){ console.log("scheduler polling"); });
- scheduler.on('master', function(state){ console.log("scheduler became master"); });
- scheduler.on('error', function(error){ console.log("scheduler error >> " + error); });
- scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); });
- scheduler.on('transferred_job', function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); });
- var shutdown = function(){
- scheduler.end(function(){
- worker.end(function(){
- console.log('bye.');
- // db.query('UPDATE masteruser SET is_live = false')
- // .then(function(data){
- // console.log('updated all entries in masteruser is_live to deactivated!');
- process.exit();
- // })
- });
- });
- };
- process.on('SIGTERM', shutdown);
- process.on('SIGINT', shutdown);
- //////////////////
- // WORKER TASKS //
- //////////////////
- var jobs = {
- "followTask": {
- plugins: [ 'jobLock', 'retry' ],
- pluginOptions: {
- jobLock: {},
- retry: {
- retryLimit: 3,
- retryDelay: (1000 * 5),
- }
- },
- perform: function(screen_name, oauthToken, oauthTokenSecret, callback){
- var gredis = new NR.gopiredis({connection: connectionDetails});
- gredis.connect(function(){
- gredis.popusertofollow(screen_name, function(usertofollow){
- dnode.connect(7070, function (remote, conn) {
- remote.postNewFollow(usertofollow, oauthToken, oauthTokenSecret, function (test) {
- //TO DO ADD USER_ID
- db.query('INSERT INTO newfollows(screen_name, followed_screen_name, date_added, user_id) SELECT ${screen_name},${followed_screen_name},CURRENT_DATE,${user_id}', {
- screen_name: screen_name,
- followed_screen_name: usertofollow,
- user_id:test['id_str']
- })
- .then(function(data){
- console.log('inserted into new follows table');
- gredis.end(function(){console.log('gredis ended inside pop redis')})
- })
- });
- });
- });
- });
- callback(null,screen_name);
- },
- },
- "populateRedis": {
- plugins: [ 'jobLock', 'retry' ],
- pluginOptions: {
- jobLock: {},
- retry: {
- retryLimit: 3,
- retryDelay: (1000 * 5),
- }
- },
- perform: function(screen_name, oauthToken, oauthTokenSecret, callback){
- var dbcall1 = [];
- var dbcall2 = [];
- var differenced = [];
- var sjob;
- db.any("select user_id from seeds where flag = 'true'")
- .then(function (data){
- for(i=0; i<Object.keys(data).length; i++) {
- dbcall1.push(data[i]['user_id']);
- }
- })
- .catch(function (error) {console.log(error);});
- db.any("select user_id from newfollows where screen_name = ${sname}", {
- sname: screen_name
- })
- .then(function (data){
- for(i=0; i<Object.keys(data).length; i++) {
- dbcall2.push(data[i]['user_id']);
- }
- differenced = _.difference(dbcall1,dbcall2);
- //ADD FOLLOWABLE USERS INTO A REDIS SET
- var gredis = new NR.gopiredis({connection: connectionDetails});
- gredis.connect(function(){
- gredis.adduserstofollow(screen_name, differenced);
- });
- //SCHEDULE
- var sjob = schedule.scheduleJob(screen_name, '10,30,50 * * * * *', function(){
- var queue = new NR.queue({connection: connectionDetails}, jobs);
- queue.on('error', function(error){ console.log(error); });
- queue.connect(function(){
- if(scheduler.master){
- console.log('>>> enquing scheduled job' + ':'+ screen_name);
- queue.enqueue('nonpriority', "followTask", [screen_name, oauthToken, oauthTokenSecret]);
- // queue.end(function(){console.log('queue connection closed')});
- }
- });
- });
- })
- .catch(function (error) {console.log(error);});
- callback(null,screen_name);
- },
- },
- };
- ///////////////////////////////
- // START ONE WORKER PROCESS ///
- ///////////////////////////////
- var worker = new NR.worker({connection: connectionDetails, queues: ['priority', 'nonpriority']}, jobs);
- worker.connect(function(){
- worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host
- worker.start();
- });
- ///////////////////////////////
- // REGESTERING WORKER EVENTS //
- ///////////////////////////////
- worker.on('start', function(){ console.log("worker started"); });
- worker.on('end', function(){ console.log("worker ended"); });
- worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); });
- worker.on('poll', function(queue){ console.log("worker polling " + queue); });
- worker.on('job', function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); });
- worker.on('reEnqueue', function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); });
- worker.on('success', function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); });
- worker.on('failure', function(queue, job, failure){ console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); });
- worker.on('error', function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); });
- worker.on('pause', function(){ console.log("worker paused"); });
Add Comment
Please, Sign In to add comment