Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /* Copyright (C) 2018 Travelport. All rights reserved.
- * Maintains all the Bolt classes
- */
- package com.travelport.pnr.storm.bolt;
- import com.google.common.collect.ImmutableMap;
- import com.travelport.pnr.constants.ConfigurationParams;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.time.ZoneId;
- import java.time.ZonedDateTime;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
- import java.util.Optional;
- import java.util.TimeZone;
- import java.util.stream.Collectors;
- import org.apache.storm.metric.api.MeanReducer;
- import org.apache.storm.metric.api.ReducedMetric;
- import org.apache.storm.task.OutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.topology.base.BaseRichBolt;
- import org.apache.storm.tuple.Fields;
- import org.apache.storm.tuple.Tuple;
- import org.apache.storm.tuple.Values;
- import org.apache.storm.utils.TupleUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.travelport.pnr.constants.ConfigurationParams;
- import com.travelport.pnr.elasticsearch.BulkIngestRequest;
- import com.travelport.pnr.elasticsearch.BulkIngestResponse;
- import com.travelport.pnr.elasticsearch.ElasticError;
- import com.travelport.pnr.elasticsearch.PNRIndexElasticSearchProcessor;
- import com.travelport.pnr.elasticsearch.PnrBatchHelper;
- import com.travelport.pnr.event.PnrEventMsg;
- import com.travelport.pnr.exception.ElasticSearchConnectionException;
- import com.travelport.pnr.util.PNRIndexBean;
- import com.travelport.pnr.util.PNRIndexStats;
- import com.travelport.pnr.util.PNRIndexStats.TimeBlock;
- import com.travelport.pnr.util.PNRIndexStrongAES;
- /**
- * <h1>ElasticSearch processor Bolt!</h1>
- * <p>
- * The PNRIndexElasticSearchBoltJest class is responsible for indexing the JSON
- * documents into Elastic Search with prior Authentication   Authorization
- * in need to TravelPort Business
- *
- * @author abhijeet.chaudhury
- * @version 1.0
- * @Organisation Cognizant
- * @since 11/02/2016
- */
- public class PNRIndexElasticSearchBoltJest extends BaseRichBolt {
- private static final long serialVersionUID = 6102684522147111555L;
- private static final String METRICS_NAME="avg_ingest_time";
- private static final int DEF_METR_WIN=10;
- private static final Logger LOG = LoggerFactory.getLogger(PNRIndexElasticSearchBoltJest.class);
- private transient PNRIndexElasticSearchProcessor pnrIndexElasticSearchProcessor;
- private transient OutputCollector collector;
- private String username;
- private String password;
- private String ip;
- private String port;
- private int flushIntervalSecs;
- private transient PnrBatchHelper batchHelper;
- private transient ReducedMetric ingestTimeMetric;
- public PNRIndexElasticSearchBoltJest(String username, String password, String ip, String port,int flushInvl) {
- this.username = username;
- this.password = password;
- this.ip = ip;
- this.port = port;
- this.flushIntervalSecs = flushInvl;
- }
- /**
- * @author sandeep.gutha
- * Main method of bolt invoked each time tuple is received.
- * Processes tuples in the batch manner.
- * If batch is not full then it accumulates the tuples.
- * Batch is flushed if it's full or time is expired.
- */
- public void execute(Tuple input) {
- if (batchHelper.shouldHandle(input)) {
- recordStats(input);
- batchHelper.addTuple(input);
- }
- if (batchHelper.shouldFlush()) {
- flushTuples();
- }
- }
- private void failAllTuples() {
- batchHelper.getTuples().forEach((tuple) -> collector.fail(tuple));
- batchHelper.clear();
- }
- private void recordStats(Tuple input) {
- long receivedTime = System.currentTimeMillis();
- PNRIndexBean pnrIndexBean = extractPnrIndexBeanFromTuple(input);
- pnrIndexBean.getStats().setMapDur(new PNRIndexStats.TimeBlock(
- pnrIndexBean.getStats().getDcdDur().getStopTs(), receivedTime));
- }
- /**
- * Mapping between the cores and timezone of the rdt date field
- */
- private Map<String, String> crsToTimezoneMap = ImmutableMap.of(
- "1G", "UTC",
- "1V", "America/Denver",
- "1P", "America/New_York",
- "1J", "America/New_York");
- /**
- * Converts input date with timezone to UTC
- * @param input
- * @param inputTimezone
- * @return time in millis since epoch
- */
- private long convertTimeToUTC(Date input, String inputTimezone) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
- String dtString = sdf.format(input);
- sdf.setTimeZone(TimeZone.getTimeZone(inputTimezone));
- try {
- return sdf.parse(dtString).getTime();
- } catch (ParseException e) {
- throw new IllegalArgumentException(e);
- }
- }
- private long extractDateField(Tuple tuple) {
- PNRIndexBean pnrIndexBean = extractPnrIndexBeanFromTuple(tuple);
- String crs = pnrIndexBean.getRowKey().substring(0, 2).toUpperCase();
- String timezone = crsToTimezoneMap.getOrDefault(crs, "UTC");
- Date dt = Optional.ofNullable(pnrIndexBean.getBp08RDT())
- .orElseGet(() -> Optional.ofNullable(pnrIndexBean.getDpteTPFTS()).orElseGet(Date::new));
- return convertTimeToUTC(dt, timezone);
- }
- private void flushTuples() {
- List<Tuple> tuples = batchHelper.getTuples();
- List<BulkIngestRequest> requests = tuples.stream()
- .map(tuple -> new BulkIngestRequest(extractMessageFromTuple(tuple), extractIndexUrlFromTuple(tuple), extractDateField(tuple)))
- .collect(Collectors.toList());
- long startIngestTs = System.currentTimeMillis();
- try {
- List<BulkIngestResponse> responses = pnrIndexElasticSearchProcessor.batchIngest(requests);
- processResponses(responses,new TimeBlock(startIngestTs,System.currentTimeMillis()));
- } catch (ElasticSearchConnectionException e) {
- LOG.error("Error Occured while processing the ES Index", e);
- failAllTuples();
- }
- }
- private void processResponses(List<BulkIngestResponse> responses,TimeBlock indDur) {
- List<Tuple> tuples = batchHelper.getTuples();
- int ackedCnt=0;
- for (int i=0; i< responses.size() && i < tuples.size(); i++) {
- ackedCnt+=ackTuple(tuples.get(i),responses.get(i),indDur);
- }
- if ( ingestTimeMetric != null ) {
- ingestTimeMetric.update(indDur.timeDifference());
- }
- //fail all unmatched tuples
- for (int i = responses.size();i < tuples.size(); i++) {
- collector.fail(tuples.get(i));
- }
- LOG.info("Batch size: {}. Successfully processed {} out of {} requests",tuples.size(),ackedCnt, responses.size());
- batchHelper.clear();
- }
- private int ackTuple(Tuple tuple, BulkIngestResponse response,TimeBlock indDur) {
- int result = 0;
- PNRIndexBean bean = extractPnrIndexBeanFromTuple(tuple);
- bean.getStats().setIndDur(indDur);
- long waitMs = bean.getStats().getIndDur().getStartTs().getTime() - bean.getStats().getMapDur().getStopTs().getTime();
- boolean hasError = false;
- if (response.getEsErrors().size() > 0) {
- hasError = true;
- ElasticError eerr = response.getEsErrors().get(0);
- //ignore version conflicts
- if (!eerr.getType().startsWith("version_conflict")) {
- LOG.error("{}\tWait\t{}\tPROC\t{}\t{}\t{}",extractIndexUrlFromTuple(tuple),waitMs,bean.getStats().getIndDur().timeDifference(),
- eerr.getType(),eerr.getReason());
- }
- } else {
- LOG.info("{}\tWait\t{}\tPROC\t{}",extractIndexUrlFromTuple(tuple),waitMs,
- bean.getStats().getIndDur().timeDifference());
- }
- collector.ack(tuple);
- result+=1;
- if (!hasError && isExecuteEvent(tuple)) {
- emitDownstreamEvent(tuple);
- }
- return result;
- }
- /**
- * This method will load all the ElasticSearch Configurations from the
- * properties file A High AES Encryption algorithm call is implemented to
- * load the values for security and connect to client using connection pool
- * for singleton connection
- *
- * @param stormConf
- * @param context
- * @param outCollector
- */
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector outCollector) {
- collector = outCollector;
- PNRIndexStrongAES strongAes = new PNRIndexStrongAES(
- (String) stormConf.get(ConfigurationParams.ENC_KEY));
- username = strongAes.decrypt(username);
- password = strongAes.decrypt(password);
- String elasticCompleteUri = ConfigurationParams.HTTP_VAL + ConfigurationParams.COLON + ConfigurationParams.DOUBLE_FWD_SLASH + ip
- + ConfigurationParams.COLON + port + ConfigurationParams.FWD_SLASH;
- pnrIndexElasticSearchProcessor = new PNRIndexElasticSearchProcessor(elasticCompleteUri, username, password);
- int batchSize = Integer.parseInt((String)stormConf.get(ConfigurationParams.ES_BATCH_SIZE));
- this.batchHelper = new PnrBatchHelper(batchSize);
- this.ingestTimeMetric = context.registerMetric(METRICS_NAME, new MeanReducer(), DEF_METR_WIN);
- }
- protected void emitDownstreamEvent(Tuple input) {
- PNRIndexBean pnrIndxBean = extractPnrIndexBeanFromTuple(input);
- String docId = pnrIndxBean.getRowKey();
- String crs = docId.substring(0, 2);
- String recordLocator = docId.substring(2, docId.length() - 8).toUpperCase();
- String dt = docId.substring(docId.length() - 8);
- collector.emit(input, new Values(new PnrEventMsg(docId, recordLocator, crs,
- pnrIndxBean.getPnrVersion(), dt, pnrIndxBean.getPseudoCityCode())));
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("pnrEventMsg"));
- }
- /**
- * Does the component config: adds tick interval into the config
- *
- * @return modified config
- */
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
- }
- private PNRIndexBean extractPnrIndexBeanFromTuple(Tuple tuple) {
- return (PNRIndexBean) tuple.getValue(0);
- }
- private String extractIndexUrlFromTuple(Tuple tuple) {
- return tuple.getString(2);
- }
- private String extractMessageFromTuple(Tuple tuple) {
- return tuple.getString(1);
- }
- private Boolean isExecuteEvent(Tuple tuple) {
- return tuple.getBoolean(3);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement