Guest User

Untitled

a guest
Aug 29th, 2017
30
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. require('dotenv').load();
  2. var Redshift = require('node-redshift');
  3. var SQL = require('sql-template-strings');
  4. var Store = require('jfs');
  5.  
  6. var sessions_db = new Store('../botkit/lib/storage/sessions');
  7. var users_db = new Store('../botkit/lib/storage/users');
  8.  
  9. var client = {
  10.     user: process.env.Redshift_user,
  11.     database: process.env.Redshift_database,
  12.     password: process.env.Redshift_password,
  13.     port: process.env.Redshift_port,
  14.     host: process.env.Redshift_host
  15. };
  16.  
  17. var redshift  = new Redshift(client);
  18.  
  19. if (!process.env.Redshift_user) {
  20.     console.log('Error: Specify Redshift_user in environment');
  21.     process.exit(1);
  22. }
  23.  
  24. if (!process.env.Redshift_database) {
  25.     console.log('Error: Specify Redshift_database in environment');
  26.     process.exit(1);
  27. }
  28.  
  29. if (!process.env.Redshift_password) {
  30.     console.log('Error: Specify Redshift_password in environment');
  31.     process.exit(1);
  32. }
  33.  
  34. if (!process.env.Redshift_port) {
  35.     console.log('Error: Specify Redshift_port in environment');
  36.     process.exit(1);
  37. }
  38.  
  39. if (!process.env.Redshift_host) {
  40.     console.log('Error: Specify Redshift_host in environment');
  41.     process.exit(1);
  42. }
  43.  
  44.  
  45. var schedule = require('node-schedule');
  46.  
  47. var rule = new schedule.RecurrenceRule();
  48.  
  49. rule.second = [0, 15, 30, 45];
  50.  
  51. function getSessions() {
  52.     return new Promise(function (resolve, reject) {
  53.         sessions_db.all(function (sessions) {
  54.             return resolve(sessions);
  55.         });;
  56.     })
  57. }
  58.  
  59. function getUsers() {
  60.     return new Promise(function (resolve, reject) {
  61.         users_db.all(function (users) {
  62.             return resolve(users);
  63.         });
  64.     });
  65. }
  66.  
  67. var j = schedule.scheduleJob(rule, function() {
  68.     var dbSnapshot = {
  69.     }
  70.  
  71.     return getSessions()
  72.     .then(function(sessions) {
  73.         dbSnapshot.sessions = sessions;
  74.         return getUsers();
  75.     })
  76.     .then(function(users) {
  77.         dbSnapshot.users = users;
  78.         return;
  79.     })
  80.     .then(function() {
  81.         var sessions_list = Object.keys(dbSnapshot.sessions);
  82.         var users_list = Object.keys(dbSnapshot.users);
  83.  
  84.         var last_sessionIds = [];
  85.  
  86.         dbSnapshot.users.map(function(user) {
  87.             let last_sessionId = user.last_session;
  88.  
  89.             // Construct an array with all the last sessions :
  90.             last_sessionIds.push(last_sessionId);
  91.  
  92.             return _.filter(sessions_list, function(sessionId) {
  93.                 // remove ids from all last sessions
  94.                 return last_sessionId.indexOf(sessionId) > -1;
  95.             });
  96.         });
  97.     })
  98.     .then(function(sessionsToPushIds) {
  99.         sessionsToPushIds.map(function (sessionId) {
  100.             console.log(`pushing into db session ${sessionId}`;
  101.  
  102.                 var start_time = sessions[sessionId].start_time;
  103.                 var user_uuid = sessions[sessionId].userid;
  104.                 var messages = sessions[sessionId].messages;
  105.  
  106.                 //ADD redshift push
  107.                 redshift.query(SQL`INSERT INTO sessions (id, start_time, userid) VALUES (${sessionId},${start_time}, ${user_uuid})`, {raw: true})
  108.                 .then(function() {
  109.                     let batchInsert = "";
  110.  
  111.                     dbSnapshot.messages.forEach(function(message) {
  112.                         let mess_uuid = message.mess_uuid;
  113.                         let content = message.content;
  114.                         let received_at = message.received_at;
  115.                         let intent = message.intent;
  116.                         let entity = message.entity;
  117.                         let resp_uuid = message.resp_uuid;
  118.  
  119.                         batchInsert += `(${mess_uuid},${content},${received_at},${resp_uuid},${session_uuid}),`;
  120.                     })
  121.  
  122.                     batchInsert = batchInsert.slice(0, -1);
  123.  
  124.                     // TODO add error handling on maximum string size reached
  125.                     redshift.query(SQL`INSERT INTO messages (mess_uuid, content, received_at, resp_uuid, sessionid) VALUES ${batchInsert}`, {raw: true})
  126.                     .then(function(){
  127.                         console.log( messages[message] );
  128.                         return sessionId;
  129.                     })
  130.                     .catch(function(err){
  131.                         console.log(err);
  132.                         return sessionId;
  133.                     });
  134.                 })
  135.                 .then(function(sessionId) {
  136.                     sessions_db.delete(sessionId);
  137.                 });
  138.             });
  139.     });
  140. });
Add Comment
Please, Sign In to add comment