Advertisement
Guest User

Untitled

a guest
May 29th, 2015
237
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 20.61 KB | None | 0 0
  1. // CodeStyle :
  2. // fu.. those stupid implicite call, please use semicolon, return and everything to avoid confusion
  3.  
  4. var parser = require('xml2json');
  5. var async = require('async');
  6. var fs = require('fs');
  7. var process = require('process');
  8. var moment = require('moment');
  9.  
  10. var mysql_singleton = require('./mysql_singleton');
  11. var es_singleton = require('./es_singleton');
  12. var logger = require('./winston_logger');
  13.  
  14.  
  15. function put_mapping(es_set,callback){
  16. logger.info("Entering put_mapping");
  17. const curcol_idx= es_set['index'];
  18. const curcol_doctype= es_set['type'];
  19. const query_create= es_set['query']['query_create'];
  20.  
  21. var query_mapping = {
  22. 'index':curcol_idx,
  23. 'body': {}
  24. };
  25. query_mapping['body']['mappings'] = {};
  26. query_mapping['body']['mappings'][curcol_doctype] = query_create;
  27.  
  28. es_singleton.getPool.indices.create( query_mapping
  29. , function (err, response, status){
  30. if(err === true) { logger.error("putMapping err="+err); }
  31. logger.info("Index creation success");
  32. callback(err);
  33. return;
  34. });
  35. return;
  36. }
  37.  
  38. function check_exist(es_set,callback){
  39. logger.info("Entering check_exist");
  40. const curcol_idx= es_set['index'];
  41.  
  42. es_singleton.getPool.indices.exists({
  43. // 'local': false,
  44. 'index': curcol_idx
  45. }, function (error, exists) {
  46. if (exists === true) {
  47. logger.info("Check success");
  48. callback(error);
  49. } else {
  50. put_mapping(es_set,callback); // comment to use Auto mapping by ES
  51. callback(error);
  52. }
  53. return; //end exist callback
  54. });
  55. return;
  56. }
  57.  
  58. /**
  59. * Fonction that return the number of stored document and the last ID
  60. * */
  61. function check_countsave(es_set,callback){
  62. logger.info("Entering check_countsave");
  63. const curcol_idx= es_set['index'];
  64. const curcol_doctype= es_set['type'];
  65. const query_count= es_set['query']['query_count'];
  66.  
  67. //data we want to calculate here
  68. var count_saved = 0;
  69. var last_data = 0;
  70.  
  71. es_singleton.getPool.search({
  72. index: curcol_idx,
  73. type: curcol_doctype,
  74. body: query_count
  75. }, function (error, response) {
  76. //es_singleton.getPool.close();
  77. if(error){
  78. logger.error('check_countsave, Err:'+error);
  79. fs.writeFile('debug/check_countsave_debug.json', JSON.stringify(error), FileSaveAck);
  80. }
  81. else {
  82. //logger.info('response='+JSON.stringify(response));
  83. var hit = response['hits'];
  84. count_saved = hit['total'];
  85. if(hit['hits'].length > 0){
  86. last_data = hit['hits'][0]['_id'];
  87. }
  88. logger.info("count_saved="+count_saved+" timestamp="+last_data);
  89. }
  90. callback(error,{'count_saved':count_saved, 'last_data':last_data});
  91. return;
  92. });
  93. return;
  94. }
  95.  
  96. /**
  97. * Calculate the number of batch needed with our number our cpu_process, batch_min and batch_max
  98. * Return the number size of batch. (Parameter for SQL query)
  99. * */
  100. function calc_nbbatch(count_toAdd,batch_opt,callback) {
  101. var curcol_batch_min = batch_opt['min'];
  102. var curcol_batch_max = batch_opt['max'];
  103. const curcol_max_process = batch_opt['nb_process'];
  104. //what we want to calculate here
  105. var nbToAdd=0;
  106.  
  107. if(curcol_batch_min > curcol_batch_max){
  108. logger.info('Damn ur sh.., since when a min is superior to a max ?... swaping values, current min='+curcol_batch_min+' max='+curcol_batch_max);
  109. curcol_batch_min = curcol_batch_min ^ curcol_batch_max;
  110. curcol_batch_max = curcol_batch_min ^ curcol_batch_max;
  111. curcol_batch_min = curcol_batch_min ^ curcol_batch_max;
  112. logger.info('min='+curcol_batch_min+' max='+curcol_batch_max);
  113. }
  114.  
  115. if(count_toAdd <= 0){
  116. callback(null, {'nb_batch':batch_size , 'batch_size':nb_batch});
  117. return;
  118. }
  119.  
  120. batch_size = Math.ceil(count_toAdd/curcol_max_process); //let try to have 20 thread/process only
  121. if(batch_size < curcol_batch_min) {
  122. batch_size = curcol_batch_min; //min 10K
  123. }
  124. if(batch_size > curcol_batch_max) {
  125. batch_size = curcol_batch_max; //max 1M
  126. }
  127. nb_batch = Math.ceil(count_toAdd/batch_size); //auto recalc nb of thread batch
  128.  
  129. callback(null, {'nb_batch':nb_batch , 'batch_size':batch_size});
  130. return;
  131. }
  132.  
  133. /**
  134. * Fonction servant a calculer combien de data (fichier xml) devront etre processer (rajouter)
  135. * Applique egalement les strategie de regression si l'ID n'est pas en mode autoincrement
  136. * */
  137. function calc_nbToAdd(count_set,callback) {
  138. const count_new=count_set['new'];
  139. const count_tot=count_set['tot'];
  140. const count_saved=count_set['saved'];
  141.  
  142. const count_old = count_tot-count_new;
  143. const nb_unsync = count_old-count_saved;
  144.  
  145. const count_toAdd = count_tot-count_old; //should be = to count new if id increase
  146.  
  147. logger.info('count_new='+count_new+' count_old='+count_old+' nb_unsync='+nb_unsync+' count_toAdd='+count_toAdd );
  148. //update our set
  149. count_set['old'] = count_old;
  150. count_set['unsync'] = nb_unsync;
  151. count_set['toAdd'] = count_toAdd;
  152.  
  153. //Apply check_unsync strategie ??
  154. //placeholder in case if needed
  155.  
  156. callback(null, count_set);
  157. return;
  158. }
  159.  
  160.  
  161. function FileSaveAck(err){
  162. if(err){
  163. logger.error('Couldn\'t save file');
  164. } else {
  165. logger.info('File saved');
  166. }
  167. }
  168.  
  169. function parse_sub_recellState(rcell_state,psub_recellState_callback){
  170. async.each(rcell_state, // loop trough rcell_state (let call them final result)
  171. function(state,rcell_resul_task_callback){
  172. if('Result' in state){
  173. var tmp = state['Result'];
  174. if(tmp === Object(tmp)){
  175. // logger.info('found:'+ JSON.stringify(item) );
  176. tmp = ''; // erase it (tmp to fix conflict type)
  177. }
  178. }
  179. rcell_resul_task_callback();
  180. return;
  181. },
  182. function(err){ // all states looped done
  183. if(err){ logger.error('rcell_res_final, Err:'+err); }
  184. psub_recellState_callback();
  185. return;
  186. }
  187. ); //end each rcell_res
  188. return;
  189. }
  190.  
  191. function check_attribute(attribute, base, id, isarray) {
  192. const debug_fs_name = 'debug/checkattr_debug'+id+'_'+base+'_'+attribute+'.json';
  193. if(!(attribute in base)){
  194. logger.error('check_attribute errors, "'+attribute+'" object not found in "'+base+'" for id='+id);
  195. fs.writeFile(debug_fs_name, JSON.stringify(base), FileSaveAck);
  196. return null;
  197. }
  198. var ref_tmp = base[attribute];
  199. if(isarray == true){ //check if it's an array
  200. if(ref_tmp.length <= 0) {
  201. logger.error('check_attribute errors, "'+attribute+'" not an array or empty in "'+base+'" for id='+id);
  202. fs.writeFile(debug_fs_name, JSON.stringify(ref_tmp), FileSaveAck);
  203. return null;
  204. }
  205. }
  206. return ref_tmp;
  207. }
  208.  
  209. /**
  210. * Check JSON field, and transform them so that ES accept it
  211. * This is done in an async manner on all fields
  212. * Also add the query to be done in an array for es_bulk
  213. * */
  214. function prepare_fill(id,json,datas,curcol_idx,curcol_doctype,pcallback){
  215. //logger.info('Entering rescall');
  216. const debug_fs_name = 'debug/prepare_fill_debug_'+id+'.json';
  217.  
  218. logger.info('prepare_fill converting id='+id);
  219. //logger.info('json ='+JSON.stringify(json));
  220.  
  221. if(!('jav:RecellDesktopService' in json)){
  222. if(('tns:PhoneTestService' in json)){
  223. logger.info('Found "tns:PhoneTestService", (unsuported yet, dumping... for id='+id);
  224. fs.writeFile('debug/prepare_fill_debug_tns_'+id+'.json', JSON.stringify(json), FileSaveAck);
  225. }
  226. else { //totally unhandled
  227. logger.error('Not supported json (basetype not "jav:RecellDesktopService") for id='+id);
  228. fs.writeFile('debug/prepare_fill_debug_base_'+id+'.json', JSON.stringify(json), FileSaveAck);
  229. }
  230. pcallback();
  231. return;
  232. }
  233. var ref_json = json["jav:RecellDesktopService"][0];
  234.  
  235. if('Computer' in ref_json){
  236. if('Uptime' in ref_json['Computer'][0]){
  237. if('$t' in ref_json['Computer'][0]["Uptime"][0]["$t"]){
  238. var uptime = ref_json['Computer'][0]["Uptime"][0]["$t"];
  239. //if( isNaN(uptime) == true){
  240. if (typeof uptime != "number") {
  241. logger.error('Normilizing Computer.Uptime for id='+id);
  242. uptime = 0; //normalize it
  243. console.log('This is not number');
  244. }
  245. }
  246. }
  247. }
  248.  
  249. var rcell_services = check_attribute('RecellServices', ref_json, id, true);
  250. if( rcell_services === null){
  251. pcallback();
  252. return;
  253. }
  254.  
  255. async.each(rcell_services,
  256. function(services,rcell_services_task_callback){ // loop trough all services
  257. var ref_service = check_attribute('RecellService', services, id, true);
  258. if( ref_service === null){
  259. rcell_services_task_callback();
  260. return;
  261. }
  262. async.each(ref_service,
  263. function(service,ref_service_task_callback){ // loop trough all service
  264. var rcell_states = check_attribute('RecellStates', service, id, true);
  265. if( rcell_states === null){
  266. ref_service_task_callback();
  267. return;
  268. }
  269. async.each(rcell_states, // loop trough all states
  270. function(state,rcell_states_task_callback){
  271. var ref_state = check_attribute('RecellState', state, id, true);
  272. if( ref_state === null){
  273. rcell_states_task_callback();
  274. return;
  275. }
  276. parse_sub_recellState(ref_state,rcell_states_task_callback);
  277. return;
  278. },
  279. function(err){ // all states looped done
  280. if(err){ logger.info('rcell_states_final, Err:'+err); }
  281. ref_service_task_callback();
  282. return;
  283. }
  284. ); //end each rcell_states
  285. return;
  286. },
  287. function(err){ // all service looped done
  288. if(err){ logger.info('rcell_services_final, Err:'+err); }
  289. rcell_services_task_callback();
  290. return;
  291. }
  292. );
  293. return; //end ref_service
  294. },
  295. function(err){ // all services looped done
  296. if(err){ logger.info('rcell_services_final, Err:'+err); }
  297. else { //add our modified json into datas for bulk
  298. datas.push({ index: { _index: curcol_idx, _type: curcol_doctype, _id: id } });
  299. datas.push(json);
  300. }
  301. pcallback();
  302. return;
  303. }
  304. );
  305.  
  306. return;
  307. }
  308.  
  309.  
  310. function Es_BulkAck(err, resp) {
  311. if(err){
  312. logger.error('Es_bulk error='+err);
  313. }
  314. else {
  315. if(resp['errors']==true){
  316. logger.error('Es_bulk error, dumping response_json');
  317. fs.appendFile('debug/Es_bulk.json', '\n'+JSON.stringify(resp), FileSaveAck);
  318. // fs.writeFile('debug/Es_bulk.json', JSON.stringify(resp), FileSaveAck);
  319. }
  320. }
  321. return;
  322. }
  323.  
  324. function ResultSQL_ACK(err) {
  325. if(err){
  326. logger.error('ResultSQL_ACK error='+err);
  327. }
  328. else {
  329. logger.info('All result SQL treated');
  330. }
  331. return;
  332. }
  333.  
  334. /**
  335. * Function to grab the data to MYSQL transform and fill into ES.
  336. * (@TODO split me)
  337. * */
  338. function fetch_data(connection,idx_b,last_data_set,fetch_callback){
  339. const nb_batch = last_data_set['nb_batch']; //avoid * dereference in loop
  340.  
  341. const batch_size = last_data_set['batch_size'];
  342. const xml_opt = last_data_set['xml_opt'];
  343. const curcol_idx = last_data_set['curcol_idx'];
  344. const curcol_doctype = last_data_set['curcol_doctype'];
  345. const last_id = last_data_set['last_data'];
  346.  
  347. const sql = 'select id,xml from record where id > '+last_id+' LIMIT '+batch_size+' OFFSET '+(idx_b*batch_size);
  348.  
  349. if(nb_batch <= 0){
  350. return;
  351. }
  352.  
  353. logger.info('Entering fetch_data');
  354. //logger.info('bath_set='+JSON.stringify(batch_set));
  355. logger.info('x:'+idx_b+'/'+nb_batch+', querying ='+sql);
  356.  
  357. connection.query(sql, [], function(err, results) {
  358. if(idx_b === nb_batch-1 ) {
  359. connection.release(); // always put connection back in pool after last query
  360. //es_singleton.getPool.close();
  361. fetch_callback(err, 'done');
  362. }
  363. else // if (idx_b < 3)
  364. { //start other query in meantime gogogo !
  365. fetch_data(connection,++idx_b,last_data_set,fetch_callback);
  366. }
  367.  
  368. if(err) {
  369. logger.error(err);
  370. return;
  371. }
  372. if(results.length <= 0){
  373. return;
  374. }
  375.  
  376. var datas = [];
  377. async.each(results,
  378. function(res,callback){ // 2nd param is the function that each item is passed to
  379. //logger.info(results);
  380. const id=res['id'];
  381. const xmlText = res['xml'];
  382. var json = parser.toJson(xmlText,xml_opt); //returns a string containing the JSON structure by default
  383. //logger.info(json);
  384. prepare_fill(id, json ,datas,curcol_idx,curcol_doctype,callback);
  385. return; //end task callback
  386. },
  387. function(err){ // 3rd param is the function to call when everything's done
  388. if(err){
  389. logger.error('Error in parsing sql_result='+sql+' err:'+err);
  390. fetch_callback();
  391. return;
  392. }
  393.  
  394. //logger.info('ES_CALL, data='+JSON.stringify(datas));
  395. if(datas.length > 0) {
  396. es_singleton.getPool.bulk({
  397. body: datas
  398. }, Es_BulkAck);
  399. }
  400. ResultSQL_ACK(err);
  401. //do not fetch_callback !
  402. return; //end async.each final callback
  403. }
  404. ); //end async.each
  405. return; //end query_select callback
  406. }); //end query_sql _select id,xml
  407. return;
  408. }
  409.  
  410. /**
  411. * Function to load configuration from json file
  412. * */
  413. function load_config(callback){
  414. var json_config;
  415. var fd = fs.readFile('config.json',{'encoding':'utf8', 'flag':'r'}, function(err, data){
  416. if(err) logger.error('load_config, fail to open file');
  417. else {
  418. logger.info('Config loaded');
  419. const json_config = JSON.parse(data);
  420. }
  421. callback(err,json_config);
  422. return;
  423. });
  424. return;
  425. }
  426.  
  427. /**
  428. * Simple fonction that reschedule our main task to be run in an intervall
  429. * NB: We do not use setIntervall as we wish to only fire this once the current collecting process is ended
  430. * else will end up with lot of process competiting...
  431. * */
  432. function reschedule(){
  433. //setTimeout(main, 1000*60*2); //call us again
  434. logger.error('Asking to reschedule');
  435. return;
  436. }
  437.  
  438. /**
  439. * General process (main) :
  440. * Check if es_index exist => create if not
  441. * Get number of document in es_index
  442. * Get number of data in mysql
  443. * Compare with number of data in ES
  444. * Calculate nb_batch
  445. * Fetch and fill data
  446. * Wait for next_clock
  447. * */
  448. function main(){
  449. load_config( function(err,json_config) {
  450. if(err){ reschedule(); return; }
  451. //logger.info('config='+JSON.stringify(json_config));
  452.  
  453. const xml_convert_opt = json_config['xml_convert_options'];
  454.  
  455.  
  456. const curcol_idx = json_config["collectors"][0]["index"]; //@TODO loop on me when rdy !
  457. const curcol_doctype = json_config["collectors"][0]["type"]; //@TODO loop on me when rdy !
  458. const batch_opt = json_config['batch_opt']; //better to have batch setting per collector as data size may varie but well...
  459.  
  460. const es_set = {
  461. 'query': json_config['es_query'],
  462. 'index': curcol_idx,
  463. 'type': curcol_doctype
  464. };
  465. //logger.info('xml_convert_opt='+JSON.stringify(xml_convert_opt));
  466. //return;
  467.  
  468.  
  469. check_exist(es_set,function(err) {
  470.  
  471. check_countsave(es_set,function(err, results) {
  472. const count_saved = results['count_saved'];
  473. const last_data = results['last_data'];
  474.  
  475. mysql_singleton.getPool.getConnection(function(err, connection) {
  476. if(err) { logger.error(err); reschedule(); return; }
  477.  
  478. const TimeStart = moment();
  479.  
  480. const sql = "select count(1) from record";
  481. connection.query(sql, [], function(err, results) {
  482. if(err) { logger.error(err); return; }
  483. const count_tot=results[0]['count(1)'];
  484.  
  485. logger.info('Count_tot='+count_tot);
  486. if(count_tot <= count_saved){
  487. logger.info("data already loaded, finishing...");
  488. reschedule(); return;
  489. }
  490.  
  491. var sql = "select count(1) from record where id > "+last_data;
  492. connection.query(sql, [], function(err, results) {
  493. if(err) { logger.error(err); reschedule(); return; }
  494.  
  495. async.waterfall([
  496. function(callback) { //calc_nbToAdd
  497. const count_new=results[0]['count(1)'];
  498. var count_set = { //ensemble of count (will be altered in calcToadd
  499. 'new':count_new,
  500. 'tot':count_tot,
  501. 'saved':count_saved
  502. }
  503. calc_nbToAdd(count_set,callback);
  504. return;
  505. },
  506. function(count_set, callback) { //calc_nbbatch
  507. const count_toAdd = count_set['toAdd'];
  508. const batch_set = calc_nbbatch(count_toAdd,batch_opt,callback);
  509. return;
  510. },
  511. function(batch_set, callback) { //fetch_data
  512. var idx_b=0; //index de loop de batch
  513. logger.info('bath_set='+JSON.stringify(batch_set));
  514.  
  515. const last_data_set = {
  516. 'nb_batch': batch_set['nb_batch'],
  517. 'batch_size': batch_set['batch_size'],
  518. 'xml_opt':xml_convert_opt,
  519. 'curcol_idx':'test_npm',
  520. 'curcol_doctype':'test_npm',
  521. 'last_data': last_data
  522. };
  523. // last_data,batch_set,'test_npm','test_npm'
  524. fetch_data(connection,idx_b,last_data_set,callback);
  525. return;
  526. }
  527. ], function (err, result) { //final
  528. var TimeStop = moment();
  529. const minDiff = TimeStop.diff(TimeStart, 'minutes');
  530. logger.info('All waterfall is done, took minDiff='+minDiff);
  531. reschedule(); //call us again
  532. return;
  533. });
  534. return; //end query_sql _count_new callback
  535. }); //end query_sql _count_new
  536. return; //end query_sql _count callback
  537. }); //end query_sql _count_tot
  538.  
  539.  
  540.  
  541. return; // end sql_con callback
  542. }); //end sql_con
  543. return; //end check_countsave callback
  544. });
  545. return; //end exist callback
  546. });
  547. return; //end load_config callback
  548. }); //end load_config
  549.  
  550. return;
  551. }
  552.  
  553. //Entry point :
  554. setTimeout(main, 3000); //Wait a bit for initialisation then go !
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement