Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // CodeStyle :
- // fu.. those stupid implicite call, please use semicolon, return and everything to avoid confusion
- var parser = require('xml2json');
- var async = require('async');
- var fs = require('fs');
- var process = require('process');
- var moment = require('moment');
- var mysql_singleton = require('./mysql_singleton');
- var es_singleton = require('./es_singleton');
- var logger = require('./winston_logger');
- function put_mapping(es_set,callback){
- logger.info("Entering put_mapping");
- const curcol_idx= es_set['index'];
- const curcol_doctype= es_set['type'];
- const query_create= es_set['query']['query_create'];
- var query_mapping = {
- 'index':curcol_idx,
- 'body': {}
- };
- query_mapping['body']['mappings'] = {};
- query_mapping['body']['mappings'][curcol_doctype] = query_create;
- es_singleton.getPool.indices.create( query_mapping
- , function (err, response, status){
- if(err === true) { logger.error("putMapping err="+err); }
- logger.info("Index creation success");
- callback(err);
- return;
- });
- return;
- }
- function check_exist(es_set,callback){
- logger.info("Entering check_exist");
- const curcol_idx= es_set['index'];
- es_singleton.getPool.indices.exists({
- // 'local': false,
- 'index': curcol_idx
- }, function (error, exists) {
- if (exists === true) {
- logger.info("Check success");
- callback(error);
- } else {
- put_mapping(es_set,callback); // comment to use Auto mapping by ES
- callback(error);
- }
- return; //end exist callback
- });
- return;
- }
- /**
- * Fonction that return the number of stored document and the last ID
- * */
- function check_countsave(es_set,callback){
- logger.info("Entering check_countsave");
- const curcol_idx= es_set['index'];
- const curcol_doctype= es_set['type'];
- const query_count= es_set['query']['query_count'];
- //data we want to calculate here
- var count_saved = 0;
- var last_data = 0;
- es_singleton.getPool.search({
- index: curcol_idx,
- type: curcol_doctype,
- body: query_count
- }, function (error, response) {
- //es_singleton.getPool.close();
- if(error){
- logger.error('check_countsave, Err:'+error);
- fs.writeFile('debug/check_countsave_debug.json', JSON.stringify(error), FileSaveAck);
- }
- else {
- //logger.info('response='+JSON.stringify(response));
- var hit = response['hits'];
- count_saved = hit['total'];
- if(hit['hits'].length > 0){
- last_data = hit['hits'][0]['_id'];
- }
- logger.info("count_saved="+count_saved+" timestamp="+last_data);
- }
- callback(error,{'count_saved':count_saved, 'last_data':last_data});
- return;
- });
- return;
- }
- /**
- * Calculate the number of batch needed with our number our cpu_process, batch_min and batch_max
- * Return the number size of batch. (Parameter for SQL query)
- * */
- function calc_nbbatch(count_toAdd,batch_opt,callback) {
- var curcol_batch_min = batch_opt['min'];
- var curcol_batch_max = batch_opt['max'];
- const curcol_max_process = batch_opt['nb_process'];
- //what we want to calculate here
- var nbToAdd=0;
- if(curcol_batch_min > curcol_batch_max){
- logger.info('Damn ur sh.., since when a min is superior to a max ?... swaping values, current min='+curcol_batch_min+' max='+curcol_batch_max);
- curcol_batch_min = curcol_batch_min ^ curcol_batch_max;
- curcol_batch_max = curcol_batch_min ^ curcol_batch_max;
- curcol_batch_min = curcol_batch_min ^ curcol_batch_max;
- logger.info('min='+curcol_batch_min+' max='+curcol_batch_max);
- }
- if(count_toAdd <= 0){
- callback(null, {'nb_batch':batch_size , 'batch_size':nb_batch});
- return;
- }
- batch_size = Math.ceil(count_toAdd/curcol_max_process); //let try to have 20 thread/process only
- if(batch_size < curcol_batch_min) {
- batch_size = curcol_batch_min; //min 10K
- }
- if(batch_size > curcol_batch_max) {
- batch_size = curcol_batch_max; //max 1M
- }
- nb_batch = Math.ceil(count_toAdd/batch_size); //auto recalc nb of thread batch
- callback(null, {'nb_batch':nb_batch , 'batch_size':batch_size});
- return;
- }
- /**
- * Fonction servant a calculer combien de data (fichier xml) devront etre processer (rajouter)
- * Applique egalement les strategie de regression si l'ID n'est pas en mode autoincrement
- * */
- function calc_nbToAdd(count_set,callback) {
- const count_new=count_set['new'];
- const count_tot=count_set['tot'];
- const count_saved=count_set['saved'];
- const count_old = count_tot-count_new;
- const nb_unsync = count_old-count_saved;
- const count_toAdd = count_tot-count_old; //should be = to count new if id increase
- logger.info('count_new='+count_new+' count_old='+count_old+' nb_unsync='+nb_unsync+' count_toAdd='+count_toAdd );
- //update our set
- count_set['old'] = count_old;
- count_set['unsync'] = nb_unsync;
- count_set['toAdd'] = count_toAdd;
- //Apply check_unsync strategie ??
- //placeholder in case if needed
- callback(null, count_set);
- return;
- }
- function FileSaveAck(err){
- if(err){
- logger.error('Couldn\'t save file');
- } else {
- logger.info('File saved');
- }
- }
- function parse_sub_recellState(rcell_state,psub_recellState_callback){
- async.each(rcell_state, // loop trough rcell_state (let call them final result)
- function(state,rcell_resul_task_callback){
- if('Result' in state){
- var tmp = state['Result'];
- if(tmp === Object(tmp)){
- // logger.info('found:'+ JSON.stringify(item) );
- tmp = ''; // erase it (tmp to fix conflict type)
- }
- }
- rcell_resul_task_callback();
- return;
- },
- function(err){ // all states looped done
- if(err){ logger.error('rcell_res_final, Err:'+err); }
- psub_recellState_callback();
- return;
- }
- ); //end each rcell_res
- return;
- }
- function check_attribute(attribute, base, id, isarray) {
- const debug_fs_name = 'debug/checkattr_debug'+id+'_'+base+'_'+attribute+'.json';
- if(!(attribute in base)){
- logger.error('check_attribute errors, "'+attribute+'" object not found in "'+base+'" for id='+id);
- fs.writeFile(debug_fs_name, JSON.stringify(base), FileSaveAck);
- return null;
- }
- var ref_tmp = base[attribute];
- if(isarray == true){ //check if it's an array
- if(ref_tmp.length <= 0) {
- logger.error('check_attribute errors, "'+attribute+'" not an array or empty in "'+base+'" for id='+id);
- fs.writeFile(debug_fs_name, JSON.stringify(ref_tmp), FileSaveAck);
- return null;
- }
- }
- return ref_tmp;
- }
- /**
- * Check JSON field, and transform them so that ES accept it
- * This is done in an async manner on all fields
- * Also add the query to be done in an array for es_bulk
- * */
- function prepare_fill(id,json,datas,curcol_idx,curcol_doctype,pcallback){
- //logger.info('Entering rescall');
- const debug_fs_name = 'debug/prepare_fill_debug_'+id+'.json';
- logger.info('prepare_fill converting id='+id);
- //logger.info('json ='+JSON.stringify(json));
- if(!('jav:RecellDesktopService' in json)){
- if(('tns:PhoneTestService' in json)){
- logger.info('Found "tns:PhoneTestService", (unsuported yet, dumping... for id='+id);
- fs.writeFile('debug/prepare_fill_debug_tns_'+id+'.json', JSON.stringify(json), FileSaveAck);
- }
- else { //totally unhandled
- logger.error('Not supported json (basetype not "jav:RecellDesktopService") for id='+id);
- fs.writeFile('debug/prepare_fill_debug_base_'+id+'.json', JSON.stringify(json), FileSaveAck);
- }
- pcallback();
- return;
- }
- var ref_json = json["jav:RecellDesktopService"][0];
- if('Computer' in ref_json){
- if('Uptime' in ref_json['Computer'][0]){
- if('$t' in ref_json['Computer'][0]["Uptime"][0]["$t"]){
- var uptime = ref_json['Computer'][0]["Uptime"][0]["$t"];
- //if( isNaN(uptime) == true){
- if (typeof uptime != "number") {
- logger.error('Normilizing Computer.Uptime for id='+id);
- uptime = 0; //normalize it
- console.log('This is not number');
- }
- }
- }
- }
- var rcell_services = check_attribute('RecellServices', ref_json, id, true);
- if( rcell_services === null){
- pcallback();
- return;
- }
- async.each(rcell_services,
- function(services,rcell_services_task_callback){ // loop trough all services
- var ref_service = check_attribute('RecellService', services, id, true);
- if( ref_service === null){
- rcell_services_task_callback();
- return;
- }
- async.each(ref_service,
- function(service,ref_service_task_callback){ // loop trough all service
- var rcell_states = check_attribute('RecellStates', service, id, true);
- if( rcell_states === null){
- ref_service_task_callback();
- return;
- }
- async.each(rcell_states, // loop trough all states
- function(state,rcell_states_task_callback){
- var ref_state = check_attribute('RecellState', state, id, true);
- if( ref_state === null){
- rcell_states_task_callback();
- return;
- }
- parse_sub_recellState(ref_state,rcell_states_task_callback);
- return;
- },
- function(err){ // all states looped done
- if(err){ logger.info('rcell_states_final, Err:'+err); }
- ref_service_task_callback();
- return;
- }
- ); //end each rcell_states
- return;
- },
- function(err){ // all service looped done
- if(err){ logger.info('rcell_services_final, Err:'+err); }
- rcell_services_task_callback();
- return;
- }
- );
- return; //end ref_service
- },
- function(err){ // all services looped done
- if(err){ logger.info('rcell_services_final, Err:'+err); }
- else { //add our modified json into datas for bulk
- datas.push({ index: { _index: curcol_idx, _type: curcol_doctype, _id: id } });
- datas.push(json);
- }
- pcallback();
- return;
- }
- );
- return;
- }
- function Es_BulkAck(err, resp) {
- if(err){
- logger.error('Es_bulk error='+err);
- }
- else {
- if(resp['errors']==true){
- logger.error('Es_bulk error, dumping response_json');
- fs.appendFile('debug/Es_bulk.json', '\n'+JSON.stringify(resp), FileSaveAck);
- // fs.writeFile('debug/Es_bulk.json', JSON.stringify(resp), FileSaveAck);
- }
- }
- return;
- }
- function ResultSQL_ACK(err) {
- if(err){
- logger.error('ResultSQL_ACK error='+err);
- }
- else {
- logger.info('All result SQL treated');
- }
- return;
- }
- /**
- * Function to grab the data to MYSQL transform and fill into ES.
- * (@TODO split me)
- * */
- function fetch_data(connection,idx_b,last_data_set,fetch_callback){
- const nb_batch = last_data_set['nb_batch']; //avoid * dereference in loop
- const batch_size = last_data_set['batch_size'];
- const xml_opt = last_data_set['xml_opt'];
- const curcol_idx = last_data_set['curcol_idx'];
- const curcol_doctype = last_data_set['curcol_doctype'];
- const last_id = last_data_set['last_data'];
- const sql = 'select id,xml from record where id > '+last_id+' LIMIT '+batch_size+' OFFSET '+(idx_b*batch_size);
- if(nb_batch <= 0){
- return;
- }
- logger.info('Entering fetch_data');
- //logger.info('bath_set='+JSON.stringify(batch_set));
- logger.info('x:'+idx_b+'/'+nb_batch+', querying ='+sql);
- connection.query(sql, [], function(err, results) {
- if(idx_b === nb_batch-1 ) {
- connection.release(); // always put connection back in pool after last query
- //es_singleton.getPool.close();
- fetch_callback(err, 'done');
- }
- else // if (idx_b < 3)
- { //start other query in meantime gogogo !
- fetch_data(connection,++idx_b,last_data_set,fetch_callback);
- }
- if(err) {
- logger.error(err);
- return;
- }
- if(results.length <= 0){
- return;
- }
- var datas = [];
- async.each(results,
- function(res,callback){ // 2nd param is the function that each item is passed to
- //logger.info(results);
- const id=res['id'];
- const xmlText = res['xml'];
- var json = parser.toJson(xmlText,xml_opt); //returns a string containing the JSON structure by default
- //logger.info(json);
- prepare_fill(id, json ,datas,curcol_idx,curcol_doctype,callback);
- return; //end task callback
- },
- function(err){ // 3rd param is the function to call when everything's done
- if(err){
- logger.error('Error in parsing sql_result='+sql+' err:'+err);
- fetch_callback();
- return;
- }
- //logger.info('ES_CALL, data='+JSON.stringify(datas));
- if(datas.length > 0) {
- es_singleton.getPool.bulk({
- body: datas
- }, Es_BulkAck);
- }
- ResultSQL_ACK(err);
- //do not fetch_callback !
- return; //end async.each final callback
- }
- ); //end async.each
- return; //end query_select callback
- }); //end query_sql _select id,xml
- return;
- }
- /**
- * Function to load configuration from json file
- * */
- function load_config(callback){
- var json_config;
- var fd = fs.readFile('config.json',{'encoding':'utf8', 'flag':'r'}, function(err, data){
- if(err) logger.error('load_config, fail to open file');
- else {
- logger.info('Config loaded');
- const json_config = JSON.parse(data);
- }
- callback(err,json_config);
- return;
- });
- return;
- }
- /**
- * Simple fonction that reschedule our main task to be run in an intervall
- * NB: We do not use setIntervall as we wish to only fire this once the current collecting process is ended
- * else will end up with lot of process competiting...
- * */
- function reschedule(){
- //setTimeout(main, 1000*60*2); //call us again
- logger.error('Asking to reschedule');
- return;
- }
- /**
- * General process (main) :
- * Check if es_index exist => create if not
- * Get number of document in es_index
- * Get number of data in mysql
- * Compare with number of data in ES
- * Calculate nb_batch
- * Fetch and fill data
- * Wait for next_clock
- * */
- function main(){
- load_config( function(err,json_config) {
- if(err){ reschedule(); return; }
- //logger.info('config='+JSON.stringify(json_config));
- const xml_convert_opt = json_config['xml_convert_options'];
- const curcol_idx = json_config["collectors"][0]["index"]; //@TODO loop on me when rdy !
- const curcol_doctype = json_config["collectors"][0]["type"]; //@TODO loop on me when rdy !
- const batch_opt = json_config['batch_opt']; //better to have batch setting per collector as data size may varie but well...
- const es_set = {
- 'query': json_config['es_query'],
- 'index': curcol_idx,
- 'type': curcol_doctype
- };
- //logger.info('xml_convert_opt='+JSON.stringify(xml_convert_opt));
- //return;
- check_exist(es_set,function(err) {
- check_countsave(es_set,function(err, results) {
- const count_saved = results['count_saved'];
- const last_data = results['last_data'];
- mysql_singleton.getPool.getConnection(function(err, connection) {
- if(err) { logger.error(err); reschedule(); return; }
- const TimeStart = moment();
- const sql = "select count(1) from record";
- connection.query(sql, [], function(err, results) {
- if(err) { logger.error(err); return; }
- const count_tot=results[0]['count(1)'];
- logger.info('Count_tot='+count_tot);
- if(count_tot <= count_saved){
- logger.info("data already loaded, finishing...");
- reschedule(); return;
- }
- var sql = "select count(1) from record where id > "+last_data;
- connection.query(sql, [], function(err, results) {
- if(err) { logger.error(err); reschedule(); return; }
- async.waterfall([
- function(callback) { //calc_nbToAdd
- const count_new=results[0]['count(1)'];
- var count_set = { //ensemble of count (will be altered in calcToadd
- 'new':count_new,
- 'tot':count_tot,
- 'saved':count_saved
- }
- calc_nbToAdd(count_set,callback);
- return;
- },
- function(count_set, callback) { //calc_nbbatch
- const count_toAdd = count_set['toAdd'];
- const batch_set = calc_nbbatch(count_toAdd,batch_opt,callback);
- return;
- },
- function(batch_set, callback) { //fetch_data
- var idx_b=0; //index de loop de batch
- logger.info('bath_set='+JSON.stringify(batch_set));
- const last_data_set = {
- 'nb_batch': batch_set['nb_batch'],
- 'batch_size': batch_set['batch_size'],
- 'xml_opt':xml_convert_opt,
- 'curcol_idx':'test_npm',
- 'curcol_doctype':'test_npm',
- 'last_data': last_data
- };
- // last_data,batch_set,'test_npm','test_npm'
- fetch_data(connection,idx_b,last_data_set,callback);
- return;
- }
- ], function (err, result) { //final
- var TimeStop = moment();
- const minDiff = TimeStop.diff(TimeStart, 'minutes');
- logger.info('All waterfall is done, took minDiff='+minDiff);
- reschedule(); //call us again
- return;
- });
- return; //end query_sql _count_new callback
- }); //end query_sql _count_new
- return; //end query_sql _count callback
- }); //end query_sql _count_tot
- return; // end sql_con callback
- }); //end sql_con
- return; //end check_countsave callback
- });
- return; //end exist callback
- });
- return; //end load_config callback
- }); //end load_config
- return;
- }
- //Entry point :
- setTimeout(main, 3000); //Wait a bit for initialisation then go !
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement