Advertisement
Guest User

Icover

a guest
May 24th, 2019
133
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. 'use strict';
  2.  
  3. /**
  4.  * Icover API
  5.  *
  6.  * @use             {{{
  7.  *      $ node app.js [<debug> [<timeout>]]
  8.  * }}}
  9.  *
  10.  * @comment         Must be started at least 1 hour after the `launchCluster.sh` script (to start the Spark streaming job on the cluster)
  11.  *
  12.  * @param debug     Set to 'true' to get extra logs (default: false)
  13.  * @param timeout   In milliseconds (default: 5000)
  14.  */
  15.  
  16. // Import statements
  17. var express = require('express');
  18. var bodyParser = require('body-parser');
  19. var timeout = require('connect-timeout');
  20. var kafka = require('kafka-node');
  21. var exec = require('child_process').exec;
  22.  
  23. // Read arguments
  24. const debug = process.argv.length > 2 ? process.argv[2] == 'true' : false;
  25. if (debug) console.log('Debug mode is on');
  26. const TIMEOUT = process.argv.length > 3 ? parseInt(process.argv[3]) : 5000;
  27. console.log('Timeout is set at ' + TIMEOUT + ' ms');
  28.  
  29. // Initiate constants
  30. const TOPIC_MATCH = debug ? 'tests' : 'realtime-match',
  31.       TOPIC_VALID = debug ? 'tests' : 'realtime-valid';
  32. console.log('Kafka topics: (match) ' + TOPIC_MATCH + ' / (valid) ' + TOPIC_VALID);
  33.  
  34. // Create the application endpoint
  35. var app = express();
  36. app.use(timeout(TIMEOUT));
  37. app.use(bodyParser.json());
  38. app.use(bodyParser.urlencoded({extended: true}));
  39.  
  40. // Default message at root URL
  41. app.get('/',function(req, res){
  42.     res.status(501).send('<h1>501 Not Implemented</h1><p>Unknown method or arguments</p>');
  43. });
  44.  
  45. // Utilities
  46. function str_pad(n) { return n < 10 ? '0' + n : n.toString(); }
  47. function year4Str(n) {
  48.     if (n > 1900) return n.toString();
  49.     else {
  50.         if (n > 20) return '19' + n;
  51.         else return (2000+ n).toString();
  52.     }
  53. }
  54.  
  55. // Handle incoming data, ie. first call
  56. app.post('/validate', function(req, res, next) {
  57.     // Instantiate Kafka Producer
  58.     let Producer = kafka.Producer,
  59.         client = new kafka.KafkaClient({kafkaHost:'edge-node-1.icover-services.com:6667'}),
  60.         producer = new Producer(client);
  61.     producer.on('ready', function () {
  62.         console.log('Producer is ready');
  63.     });
  64.     producer.on('error', function (err) {
  65.         console.log('Producer is in error state');
  66.         console.log(err);
  67.     });
  68.     try {
  69.         if (req.body.RecordID) {
  70.             let referenceId = req.body.RecordID.toString(),
  71.                 date = '',
  72.                 telephone = '';
  73.             if (req.body.DayOfBirth && req.body.MonthOfBirth && req.body.YearOfBirth)
  74.                 date = str_pad(parseInt(req.body.DayOfBirth)) + '/' + str_pad(parseInt(req.body.MonthOfBirth)) + '/' + year4Str(req.body.YearOfBirth);
  75.             if (!req.body.Telephone && req.body.Telephone2)
  76.                 telephone = req.body.Telephone2;
  77.             else if (req.body.Telephone)
  78.                 telephone = req.body.Telephone;
  79.             let line = [req.body.LastName, req.body.FirstName, req.body.Address1, req.body.HouseNumber, req.body.StreetName, req.body.PostalCode, date, req.body.City, telephone, req.body.CellNumber, req.body.EmailAddress].join(';');
  80.             let command = 'java -cp InputParser-latest.jar com.tutsht.inputparser.SingleParserApp 0 inputMapper.json 1 ' + referenceId + ' true "' + line + '"';
  81.             if (debug) console.log('Executing: ' + command);
  82.             exec(command, (error, stdout, stderr) => {
  83.                 if (error) {
  84.                     throw error;
  85.                 } else {
  86.                     console.log('SingleParserApp: ' + stderr);
  87.                     if (stderr.startsWith('Processed') && stdout.startsWith(referenceId)) {
  88.                         let payload = [{topic: TOPIC_MATCH, messages:stdout, partition: 0}];
  89.                         producer.send(payload, function (err, data) {
  90.                             if (debug) console.log('data:', data);
  91.                             if (data[TOPIC_MATCH]['0'] > 0) {
  92.                                 let offset = new kafka.Offset(new kafka.KafkaClient({kafkaHost:'edge-node-1.icover-services.com:6667'}));
  93.                                 offset.fetch([{ topic: TOPIC_VALID, partition: 0, time: -1, maxNum: 1 }], function (err, data) {
  94.                                     let latestOffset = Math.max(0, data[TOPIC_VALID]['0'][0] - 1);
  95.                                     if (debug) console.log('latestOffset for TOPIC_VALID: ' + latestOffset);
  96.                                     let token = 'token=' + encodeURIComponent(referenceId + "~" + latestOffset);
  97.                                     res.send(token);
  98.                                 });
  99.                             } else throw new Error('Kafka Producer error');
  100.                         });
  101.                     } else throw new Error('SingleParserApp error');
  102.                 }
  103.             });
  104.         } else throw new Error('Missing data error');
  105.     } catch(e) {
  106.         console.log(e);
  107.         console.log(req.body);
  108.         res.status(400).send('error');
  109.     }
  110. });
  111.  
  112. // Handle the result of the validation process, ie. the second call to repeat ad lib when timed out
  113. app.get('/validate', function(req, res, next) {
  114.     let latestOffset;
  115.     try {
  116.         if (debug) console.log('Token: ' + req.query.token);
  117.         let parts = req.query.token.split('~');
  118.         let referenceId = parts[0];
  119.         latestOffset = parseInt(parts[1]);
  120.         if (debug) console.log('Offset: ' + latestOffset);
  121.         const consumer = new kafka.Consumer(new kafka.KafkaClient({kafkaHost:'edge-node-1.icover-services.com:6667'}),
  122.             [{topic: TOPIC_VALID, offset: latestOffset}],
  123.             {autoCommit: false, fromOffset: true}
  124.         );
  125.         consumer.on('message', function(message) {
  126.             if (message.value.startsWith(referenceId) && !res.headersSent) {
  127.                 res.send(message.value);
  128.             }
  129.         });
  130.         consumer.on('error', function (err) {
  131.             console.log(err);
  132.             res.status(400).send('error');
  133.         });
  134.         consumer.on('offsetOutOfRange', function (err) {
  135.             console.log(err);
  136.             res.status(400).send('error');
  137.         });
  138.     } catch (e) {
  139.         console.log(e);
  140.         console.log(req.query, 'offset: ' + latestOffset);
  141.         res.status(400).send('error');
  142.     }
  143. });
  144.  
  145. // Launch application...
  146. app.listen(5001, function(){
  147.     console.log('Icover API running on port 5001');
  148. });
  149.  
  150. // ... and finally handle timeout
  151. app.use(haltOnTimedout);
  152. function haltOnTimedout(err, req, res, next) {
  153.     if (req.timedout === true) {
  154.         if (res.headersSent) {
  155.             next(err);
  156.         } else {
  157.             res.status(408).send('timeout');
  158.         }
  159.     } else {
  160.         next();
  161.     }
  162. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement