Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require('dotenv').load();
- var Redshift = require('node-redshift');
- var SQL = require('sql-template-strings');
- var Store = require('jfs');
- var sessions_db = new Store('../botkit/lib/storage/sessions');
- var users_db = new Store('../botkit/lib/storage/users');
- var client = {
- user: process.env.Redshift_user,
- database: process.env.Redshift_database,
- password: process.env.Redshift_password,
- port: process.env.Redshift_port,
- host: process.env.Redshift_host
- };
- var redshift = new Redshift(client);
- if (!process.env.Redshift_user) {
- console.log('Error: Specify Redshift_user in environment');
- process.exit(1);
- }
- if (!process.env.Redshift_database) {
- console.log('Error: Specify Redshift_database in environment');
- process.exit(1);
- }
- if (!process.env.Redshift_password) {
- console.log('Error: Specify Redshift_password in environment');
- process.exit(1);
- }
- if (!process.env.Redshift_port) {
- console.log('Error: Specify Redshift_port in environment');
- process.exit(1);
- }
- if (!process.env.Redshift_host) {
- console.log('Error: Specify Redshift_host in environment');
- process.exit(1);
- }
- var schedule = require('node-schedule');
- var rule = new schedule.RecurrenceRule();
- rule.second = [0, 15, 30, 45];
- function getSessions() {
- return new Promise(function (resolve, reject) {
- sessions_db.all(function (sessions) {
- return resolve(sessions);
- });;
- })
- }
- function getUsers() {
- return new Promise(function (resolve, reject) {
- users_db.all(function (users) {
- return resolve(users);
- });
- });
- }
- var j = schedule.scheduleJob(rule, function() {
- var dbSnapshot = {
- }
- return getSessions()
- .then(function(sessions) {
- dbSnapshot.sessions = sessions;
- return getUsers();
- })
- .then(function(users) {
- dbSnapshot.users = users;
- return;
- })
- .then(function() {
- var sessions_list = Object.keys(dbSnapshot.sessions);
- var users_list = Object.keys(dbSnapshot.users);
- var last_sessionIds = [];
- dbSnapshot.users.map(function(user) {
- let last_sessionId = user.last_session;
- // Construct an array with all the last sessions :
- last_sessionIds.push(last_sessionId);
- return _.filter(sessions_list, function(sessionId) {
- // remove ids from all last sessions
- return last_sessionId.indexOf(sessionId) > -1;
- });
- });
- })
- .then(function(sessionsToPushIds) {
- sessionsToPushIds.map(function (sessionId) {
- console.log(`pushing into db session ${sessionId}`;
- var start_time = sessions[sessionId].start_time;
- var user_uuid = sessions[sessionId].userid;
- var messages = sessions[sessionId].messages;
- //ADD redshift push
- redshift.query(SQL`INSERT INTO sessions (id, start_time, userid) VALUES (${sessionId},${start_time}, ${user_uuid})`, {raw: true})
- .then(function() {
- let batchInsert = "";
- dbSnapshot.messages.forEach(function(message) {
- let mess_uuid = message.mess_uuid;
- let content = message.content;
- let received_at = message.received_at;
- let intent = message.intent;
- let entity = message.entity;
- let resp_uuid = message.resp_uuid;
- batchInsert += `(${mess_uuid},${content},${received_at},${resp_uuid},${session_uuid}),`;
- })
- batchInsert = batchInsert.slice(0, -1);
- // TODO add error handling on maximum string size reached
- redshift.query(SQL`INSERT INTO messages (mess_uuid, content, received_at, resp_uuid, sessionid) VALUES ${batchInsert}`, {raw: true})
- .then(function(){
- console.log( messages[message] );
- return sessionId;
- })
- .catch(function(err){
- console.log(err);
- return sessionId;
- });
- })
- .then(function(sessionId) {
- sessions_db.delete(sessionId);
- });
- });
- });
- });
Add Comment
Please, Sign In to add comment