Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- 'use strict';
- /**
- * Icover API
- *
- * @use {{{
- * $ node app.js [<debug> [<timeout>]]
- * }}}
- *
- * @comment Must be started at least 1 hour after the `launchCluster.sh` script (to start the Spark streaming job on the cluster)
- *
- * @param debug Set to 'true' to get extra logs (default: false)
- * @param timeout In milliseconds (default: 5000)
- */
- // Import statements
- var express = require('express');
- var bodyParser = require('body-parser');
- var timeout = require('connect-timeout');
- var kafka = require('kafka-node');
- var exec = require('child_process').exec;
- // Read arguments
- const debug = process.argv.length > 2 ? process.argv[2] == 'true' : false;
- if (debug) console.log('Debug mode is on');
- const TIMEOUT = process.argv.length > 3 ? parseInt(process.argv[3]) : 5000;
- console.log('Timeout is set at ' + TIMEOUT + ' ms');
- // Initiate constants
- const TOPIC_MATCH = debug ? 'tests' : 'realtime-match',
- TOPIC_VALID = debug ? 'tests' : 'realtime-valid';
- console.log('Kafka topics: (match) ' + TOPIC_MATCH + ' / (valid) ' + TOPIC_VALID);
- // Create the application endpoint
- var app = express();
- app.use(timeout(TIMEOUT));
- app.use(bodyParser.json());
- app.use(bodyParser.urlencoded({extended: true}));
- // Default message at root URL
- app.get('/',function(req, res){
- res.status(501).send('<h1>501 Not Implemented</h1><p>Unknown method or arguments</p>');
- });
- // Utilities
- function str_pad(n) { return n < 10 ? '0' + n : n.toString(); }
- function year4Str(n) {
- if (n > 1900) return n.toString();
- else {
- if (n > 20) return '19' + n;
- else return (2000+ n).toString();
- }
- }
- // Handle incoming data, ie. first call
- app.post('/validate', function(req, res, next) {
- // Instantiate Kafka Producer
- let Producer = kafka.Producer,
- client = new kafka.KafkaClient({kafkaHost:'edge-node-1.icover-services.com:6667'}),
- producer = new Producer(client);
- producer.on('ready', function () {
- console.log('Producer is ready');
- });
- producer.on('error', function (err) {
- console.log('Producer is in error state');
- console.log(err);
- });
- try {
- if (req.body.RecordID) {
- let referenceId = req.body.RecordID.toString(),
- date = '',
- telephone = '';
- if (req.body.DayOfBirth && req.body.MonthOfBirth && req.body.YearOfBirth)
- date = str_pad(parseInt(req.body.DayOfBirth)) + '/' + str_pad(parseInt(req.body.MonthOfBirth)) + '/' + year4Str(req.body.YearOfBirth);
- if (!req.body.Telephone && req.body.Telephone2)
- telephone = req.body.Telephone2;
- else if (req.body.Telephone)
- telephone = req.body.Telephone;
- let line = [req.body.LastName, req.body.FirstName, req.body.Address1, req.body.HouseNumber, req.body.StreetName, req.body.PostalCode, date, req.body.City, telephone, req.body.CellNumber, req.body.EmailAddress].join(';');
- let command = 'java -cp InputParser-latest.jar com.tutsht.inputparser.SingleParserApp 0 inputMapper.json 1 ' + referenceId + ' true "' + line + '"';
- if (debug) console.log('Executing: ' + command);
- exec(command, (error, stdout, stderr) => {
- if (error) {
- throw error;
- } else {
- console.log('SingleParserApp: ' + stderr);
- if (stderr.startsWith('Processed') && stdout.startsWith(referenceId)) {
- let payload = [{topic: TOPIC_MATCH, messages:stdout, partition: 0}];
- producer.send(payload, function (err, data) {
- if (debug) console.log('data:', data);
- if (data[TOPIC_MATCH]['0'] > 0) {
- let offset = new kafka.Offset(new kafka.KafkaClient({kafkaHost:'edge-node-1.icover-services.com:6667'}));
- offset.fetch([{ topic: TOPIC_VALID, partition: 0, time: -1, maxNum: 1 }], function (err, data) {
- let latestOffset = Math.max(0, data[TOPIC_VALID]['0'][0] - 1);
- if (debug) console.log('latestOffset for TOPIC_VALID: ' + latestOffset);
- let token = 'token=' + encodeURIComponent(referenceId + "~" + latestOffset);
- res.send(token);
- });
- } else throw new Error('Kafka Producer error');
- });
- } else throw new Error('SingleParserApp error');
- }
- });
- } else throw new Error('Missing data error');
- } catch(e) {
- console.log(e);
- console.log(req.body);
- res.status(400).send('error');
- }
- });
- // Handle the result of the validation process, ie. the second call to repeat ad lib when timed out
- app.get('/validate', function(req, res, next) {
- let latestOffset;
- try {
- if (debug) console.log('Token: ' + req.query.token);
- let parts = req.query.token.split('~');
- let referenceId = parts[0];
- latestOffset = parseInt(parts[1]);
- if (debug) console.log('Offset: ' + latestOffset);
- const consumer = new kafka.Consumer(new kafka.KafkaClient({kafkaHost:'edge-node-1.icover-services.com:6667'}),
- [{topic: TOPIC_VALID, offset: latestOffset}],
- {autoCommit: false, fromOffset: true}
- );
- consumer.on('message', function(message) {
- if (message.value.startsWith(referenceId) && !res.headersSent) {
- res.send(message.value);
- }
- });
- consumer.on('error', function (err) {
- console.log(err);
- res.status(400).send('error');
- });
- consumer.on('offsetOutOfRange', function (err) {
- console.log(err);
- res.status(400).send('error');
- });
- } catch (e) {
- console.log(e);
- console.log(req.query, 'offset: ' + latestOffset);
- res.status(400).send('error');
- }
- });
- // Launch application...
- app.listen(5001, function(){
- console.log('Icover API running on port 5001');
- });
- // ... and finally handle timeout
- app.use(haltOnTimedout);
- function haltOnTimedout(err, req, res, next) {
- if (req.timedout === true) {
- if (res.headersSent) {
- next(err);
- } else {
- res.status(408).send('timeout');
- }
- } else {
- next();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement