Advertisement
Guest User

Untitled

a guest
Mar 19th, 2019
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.09 KB | None | 0 0
  1. /* Copyright (C) 2018 Travelport. All rights reserved.
  2. * Maintains all the Bolt classes
  3. */
  4. package com.travelport.pnr.storm.bolt;
  5.  
  6. import com.google.common.collect.ImmutableMap;
  7. import com.travelport.pnr.constants.ConfigurationParams;
  8.  
  9. import java.text.ParseException;
  10. import java.text.SimpleDateFormat;
  11. import java.time.ZoneId;
  12. import java.time.ZonedDateTime;
  13. import java.util.Calendar;
  14. import java.util.Date;
  15. import java.util.List;
  16. import java.util.Map;
  17. import java.util.Optional;
  18. import java.util.TimeZone;
  19. import java.util.stream.Collectors;
  20.  
  21. import org.apache.storm.metric.api.MeanReducer;
  22. import org.apache.storm.metric.api.ReducedMetric;
  23. import org.apache.storm.task.OutputCollector;
  24. import org.apache.storm.task.TopologyContext;
  25. import org.apache.storm.topology.OutputFieldsDeclarer;
  26. import org.apache.storm.topology.base.BaseRichBolt;
  27. import org.apache.storm.tuple.Fields;
  28. import org.apache.storm.tuple.Tuple;
  29. import org.apache.storm.tuple.Values;
  30. import org.apache.storm.utils.TupleUtils;
  31. import org.slf4j.Logger;
  32. import org.slf4j.LoggerFactory;
  33.  
  34. import com.travelport.pnr.constants.ConfigurationParams;
  35. import com.travelport.pnr.elasticsearch.BulkIngestRequest;
  36. import com.travelport.pnr.elasticsearch.BulkIngestResponse;
  37. import com.travelport.pnr.elasticsearch.ElasticError;
  38. import com.travelport.pnr.elasticsearch.PNRIndexElasticSearchProcessor;
  39. import com.travelport.pnr.elasticsearch.PnrBatchHelper;
  40. import com.travelport.pnr.event.PnrEventMsg;
  41. import com.travelport.pnr.exception.ElasticSearchConnectionException;
  42. import com.travelport.pnr.util.PNRIndexBean;
  43. import com.travelport.pnr.util.PNRIndexStats;
  44. import com.travelport.pnr.util.PNRIndexStats.TimeBlock;
  45. import com.travelport.pnr.util.PNRIndexStrongAES;
  46.  
  47. /**
  48. * <h1>ElasticSearch processor Bolt!</h1>
  49. * <p>
  50. * The PNRIndexElasticSearchBoltJest class is responsible for indexing the JSON
  51. * documents into Elastic Search with prior Authentication &nbsp Authorization
  52. * in need to TravelPort Business
  53. *
  54. * @author abhijeet.chaudhury
  55. * @version 1.0
  56. * @Organisation Cognizant
  57. * @since 11/02/2016
  58. */
  59.  
  60. public class PNRIndexElasticSearchBoltJest extends BaseRichBolt {
  61.  
  62. private static final long serialVersionUID = 6102684522147111555L;
  63. private static final String METRICS_NAME="avg_ingest_time";
  64. private static final int DEF_METR_WIN=10;
  65.  
  66. private static final Logger LOG = LoggerFactory.getLogger(PNRIndexElasticSearchBoltJest.class);
  67.  
  68. private transient PNRIndexElasticSearchProcessor pnrIndexElasticSearchProcessor;
  69. private transient OutputCollector collector;
  70.  
  71. private String username;
  72. private String password;
  73. private String ip;
  74. private String port;
  75.  
  76. private int flushIntervalSecs;
  77. private transient PnrBatchHelper batchHelper;
  78. private transient ReducedMetric ingestTimeMetric;
  79.  
  80. public PNRIndexElasticSearchBoltJest(String username, String password, String ip, String port,int flushInvl) {
  81. this.username = username;
  82. this.password = password;
  83. this.ip = ip;
  84. this.port = port;
  85. this.flushIntervalSecs = flushInvl;
  86. }
  87.  
  88. /**
  89. * @author sandeep.gutha
  90. * Main method of bolt invoked each time tuple is received.
  91. * Processes tuples in the batch manner.
  92. * If batch is not full then it accumulates the tuples.
  93. * Batch is flushed if it's full or time is expired.
  94. */
  95. public void execute(Tuple input) {
  96. if (batchHelper.shouldHandle(input)) {
  97. recordStats(input);
  98. batchHelper.addTuple(input);
  99. }
  100. if (batchHelper.shouldFlush()) {
  101. flushTuples();
  102. }
  103. }
  104.  
  105. private void failAllTuples() {
  106. batchHelper.getTuples().forEach((tuple) -> collector.fail(tuple));
  107. batchHelper.clear();
  108. }
  109.  
  110. private void recordStats(Tuple input) {
  111. long receivedTime = System.currentTimeMillis();
  112. PNRIndexBean pnrIndexBean = extractPnrIndexBeanFromTuple(input);
  113. pnrIndexBean.getStats().setMapDur(new PNRIndexStats.TimeBlock(
  114. pnrIndexBean.getStats().getDcdDur().getStopTs(), receivedTime));
  115. }
  116.  
  117. /**
  118. * Mapping between the cores and timezone of the rdt date field
  119. */
  120. private Map<String, String> crsToTimezoneMap = ImmutableMap.of(
  121. "1G", "UTC",
  122. "1V", "America/Denver",
  123. "1P", "America/New_York",
  124. "1J", "America/New_York");
  125.  
  126. /**
  127. * Converts input date with timezone to UTC
  128. * @param input
  129. * @param inputTimezone
  130. * @return time in millis since epoch
  131. */
  132. private long convertTimeToUTC(Date input, String inputTimezone) {
  133. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
  134. String dtString = sdf.format(input);
  135. sdf.setTimeZone(TimeZone.getTimeZone(inputTimezone));
  136. try {
  137. return sdf.parse(dtString).getTime();
  138. } catch (ParseException e) {
  139. throw new IllegalArgumentException(e);
  140. }
  141. }
  142.  
  143. private long extractDateField(Tuple tuple) {
  144. PNRIndexBean pnrIndexBean = extractPnrIndexBeanFromTuple(tuple);
  145. String crs = pnrIndexBean.getRowKey().substring(0, 2).toUpperCase();
  146. String timezone = crsToTimezoneMap.getOrDefault(crs, "UTC");
  147. Date dt = Optional.ofNullable(pnrIndexBean.getBp08RDT())
  148. .orElseGet(() -> Optional.ofNullable(pnrIndexBean.getDpteTPFTS()).orElseGet(Date::new));
  149. return convertTimeToUTC(dt, timezone);
  150. }
  151.  
  152. private void flushTuples() {
  153. List<Tuple> tuples = batchHelper.getTuples();
  154.  
  155. List<BulkIngestRequest> requests = tuples.stream()
  156. .map(tuple -> new BulkIngestRequest(extractMessageFromTuple(tuple), extractIndexUrlFromTuple(tuple), extractDateField(tuple)))
  157. .collect(Collectors.toList());
  158. long startIngestTs = System.currentTimeMillis();
  159. try {
  160. List<BulkIngestResponse> responses = pnrIndexElasticSearchProcessor.batchIngest(requests);
  161. processResponses(responses,new TimeBlock(startIngestTs,System.currentTimeMillis()));
  162. } catch (ElasticSearchConnectionException e) {
  163. LOG.error("Error Occured while processing the ES Index", e);
  164. failAllTuples();
  165. }
  166. }
  167.  
  168. private void processResponses(List<BulkIngestResponse> responses,TimeBlock indDur) {
  169. List<Tuple> tuples = batchHelper.getTuples();
  170. int ackedCnt=0;
  171.  
  172. for (int i=0; i< responses.size() && i < tuples.size(); i++) {
  173. ackedCnt+=ackTuple(tuples.get(i),responses.get(i),indDur);
  174. }
  175. if ( ingestTimeMetric != null ) {
  176. ingestTimeMetric.update(indDur.timeDifference());
  177. }
  178. //fail all unmatched tuples
  179. for (int i = responses.size();i < tuples.size(); i++) {
  180. collector.fail(tuples.get(i));
  181. }
  182. LOG.info("Batch size: {}. Successfully processed {} out of {} requests",tuples.size(),ackedCnt, responses.size());
  183. batchHelper.clear();
  184. }
  185.  
  186. private int ackTuple(Tuple tuple, BulkIngestResponse response,TimeBlock indDur) {
  187. int result = 0;
  188. PNRIndexBean bean = extractPnrIndexBeanFromTuple(tuple);
  189. bean.getStats().setIndDur(indDur);
  190. long waitMs = bean.getStats().getIndDur().getStartTs().getTime() - bean.getStats().getMapDur().getStopTs().getTime();
  191. boolean hasError = false;
  192. if (response.getEsErrors().size() > 0) {
  193. hasError = true;
  194. ElasticError eerr = response.getEsErrors().get(0);
  195. //ignore version conflicts
  196. if (!eerr.getType().startsWith("version_conflict")) {
  197. LOG.error("{}\tWait\t{}\tPROC\t{}\t{}\t{}",extractIndexUrlFromTuple(tuple),waitMs,bean.getStats().getIndDur().timeDifference(),
  198. eerr.getType(),eerr.getReason());
  199. }
  200. } else {
  201. LOG.info("{}\tWait\t{}\tPROC\t{}",extractIndexUrlFromTuple(tuple),waitMs,
  202. bean.getStats().getIndDur().timeDifference());
  203. }
  204. collector.ack(tuple);
  205. result+=1;
  206. if (!hasError && isExecuteEvent(tuple)) {
  207. emitDownstreamEvent(tuple);
  208. }
  209.  
  210. return result;
  211. }
  212.  
  213. /**
  214. * This method will load all the ElasticSearch Configurations from the
  215. * properties file A High AES Encryption algorithm call is implemented to
  216. * load the values for security and connect to client using connection pool
  217. * for singleton connection
  218. *
  219. * @param stormConf
  220. * @param context
  221. * @param outCollector
  222. */
  223. public void prepare(Map stormConf, TopologyContext context,
  224. OutputCollector outCollector) {
  225. collector = outCollector;
  226.  
  227. PNRIndexStrongAES strongAes = new PNRIndexStrongAES(
  228. (String) stormConf.get(ConfigurationParams.ENC_KEY));
  229. username = strongAes.decrypt(username);
  230. password = strongAes.decrypt(password);
  231. String elasticCompleteUri = ConfigurationParams.HTTP_VAL + ConfigurationParams.COLON + ConfigurationParams.DOUBLE_FWD_SLASH + ip
  232. + ConfigurationParams.COLON + port + ConfigurationParams.FWD_SLASH;
  233.  
  234. pnrIndexElasticSearchProcessor = new PNRIndexElasticSearchProcessor(elasticCompleteUri, username, password);
  235.  
  236. int batchSize = Integer.parseInt((String)stormConf.get(ConfigurationParams.ES_BATCH_SIZE));
  237.  
  238. this.batchHelper = new PnrBatchHelper(batchSize);
  239. this.ingestTimeMetric = context.registerMetric(METRICS_NAME, new MeanReducer(), DEF_METR_WIN);
  240. }
  241.  
  242. protected void emitDownstreamEvent(Tuple input) {
  243. PNRIndexBean pnrIndxBean = extractPnrIndexBeanFromTuple(input);
  244. String docId = pnrIndxBean.getRowKey();
  245. String crs = docId.substring(0, 2);
  246. String recordLocator = docId.substring(2, docId.length() - 8).toUpperCase();
  247. String dt = docId.substring(docId.length() - 8);
  248. collector.emit(input, new Values(new PnrEventMsg(docId, recordLocator, crs,
  249. pnrIndxBean.getPnrVersion(), dt, pnrIndxBean.getPseudoCityCode())));
  250. }
  251.  
  252. @Override
  253. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  254. declarer.declare(new Fields("pnrEventMsg"));
  255. }
  256.  
  257. /**
  258. * Does the component config: adds tick interval into the config
  259. *
  260. * @return modified config
  261. */
  262. @Override
  263. public Map<String, Object> getComponentConfiguration() {
  264. return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
  265. }
  266.  
  267. private PNRIndexBean extractPnrIndexBeanFromTuple(Tuple tuple) {
  268. return (PNRIndexBean) tuple.getValue(0);
  269. }
  270.  
  271.  
  272. private String extractIndexUrlFromTuple(Tuple tuple) {
  273. return tuple.getString(2);
  274. }
  275.  
  276. private String extractMessageFromTuple(Tuple tuple) {
  277. return tuple.getString(1);
  278. }
  279.  
  280. private Boolean isExecuteEvent(Tuple tuple) {
  281. return tuple.getBoolean(3);
  282. }
  283.  
  284. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement