Advertisement
Guest User

Untitled

a guest
Nov 15th, 2019
161
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 12.08 KB | None | 0 0
  1. package com.utrack.extservice.cache;
  2.  
  3. import java.util.Date;
  4. import java.util.HashMap;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Map.Entry;
  8. import java.util.function.Predicate;
  9. import java.util.stream.Collectors;
  10.  
  11. import org.apache.commons.lang3.time.DateUtils;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15.  
  16. import com.utrack.db.eta.logic.IJourneyLiveEtaManager;
  17. import com.utrack.db.eta.logic.ILiveEtaManager;
  18. import com.utrack.db.eta.model.JourneyLiveEta;
  19. import com.utrack.db.eta.model.LiveEta;
  20. import com.utrack.util.FormatHelper;
  21. import com.utrack.util.JourneyLiveEtaUtils;
  22. import com.utrack.util.TimeHelper;
  23.  
  24. /**
  25.  * LiveEtaCacheCreator for in memory custom cache
  26.  * <p>
  27.  * $LastChangedDate$<br>
  28.  * $LastChangedBy$<br>
  29.  * $Revision$<br>
  30.  * </p>
  31.  *
  32.  * @author Justas
  33.  */
  34. public class LiveEtaCacheCreator {
  35.  
  36.     private static final Logger LOG = LoggerFactory.getLogger(LiveEtaCacheCreator.class);
  37.  
  38.     private ILiveEtaManager liveEtaManager;
  39.  
  40.     private IJourneyLiveEtaManager journeyLiveEtaManager;
  41.  
  42.     private CacheInMemoryHolder cacheInMemoryHolder;
  43.  
  44.     @Autowired
  45.     private LiveEtaCacheCleaner liveEtaCacheCleaner;
  46.  
  47.     /**
  48.      * Number of last JounreyLiveEta dates to reload. As example if property
  49.      * value 3 and in DB we have journey live eta for dates
  50.      * 2017-10-01,2017-10-02, 2017-10-09, 2017-10-11 cache updater will reload
  51.      * only 2017-10-02, 2017-10-09, 2017-10-11 dates
  52.      */
  53.     private Integer numberOfLastJourneyDatesToReload = null;
  54.  
  55.     private Date lastCacheRefreshTime = null;
  56.  
  57.     public void init() {
  58.         LOG.info("numberOfJourneyDatesToReload={}", numberOfLastJourneyDatesToReload);
  59.     }
  60.  
  61.     public void updateLiveEtaCache() {
  62.         LOG.info("updateLiveEtaCache start");
  63.         long startTime = System.currentTimeMillis();
  64.  
  65.         if (numberOfLastJourneyDatesToReload != null) {
  66.             refreshLiveEtaDataCache();
  67.         } else {
  68.             refreshByCacheTimestamp();
  69.         }
  70.  
  71.         if (LOG.isInfoEnabled()) {
  72.             long duration = System.currentTimeMillis() - startTime;
  73.             String durationStr = TimeHelper.durationToString(duration);
  74.             LOG.info("updateLiveEtaCache updated. [{} ms] [{}]", duration, durationStr);
  75.         }
  76.  
  77.         // Every time the cache is refresh a new date will be set -5 Minutes, so
  78.         // we make sure to get all etas.
  79.         lastCacheRefreshTime = DateUtils.addMinutes(new Date(), -5);
  80.     }
  81.  
  82.     private void refreshByCacheTimestamp() {
  83.         Map<Date, List<JourneyLiveEta>> currentJourneyLiveCache = cacheInMemoryHolder.getCachedJourneyLiveEtas();
  84.  
  85.         List<Date> dbDates = journeyLiveEtaManager.findAllDates();
  86.         LOG.info("journeyLiveEtaDates.size={}", dbDates.size());
  87.         if (dbDates.size() > 0) {
  88.             Date mostRecentJourneyDate = dbDates.get(dbDates.size() - 1);
  89.            
  90.             if (cacheAlreadyInitialized(currentJourneyLiveCache)) {
  91.                 if (dbDates.size() > 0) {
  92.                     List<LiveEta> liveEtas = liveEtaManager.findByLastModifiedTimestampAfter(lastCacheRefreshTime);
  93.                     LOG.info("liveEtas.size={}", liveEtas.size());
  94.  
  95.                     if (liveEtas.size() > 0) {
  96.                         Map<Date, List<JourneyLiveEta>> newJourneyLiveEtasMappedOnDate =
  97.                                 JourneyLiveEtaUtils.groupLiveEtaByJourneyAndDate(liveEtas);
  98.  
  99.                         for (Map.Entry<Date, List<JourneyLiveEta>> entry : newJourneyLiveEtasMappedOnDate.entrySet()) {
  100.                             Date dateEntry = entry.getKey();
  101.  
  102.                             List<JourneyLiveEta> newJourneyLiveEtas = entry.getValue();
  103.                             List<JourneyLiveEta> existingJourneyLiveEtas = currentJourneyLiveCache.get(dateEntry);
  104.                             if (existingJourneyLiveEtas != null) {
  105.                                 Map<Integer, JourneyLiveEta> existingJourneys = mapToDBId(existingJourneyLiveEtas);
  106.                                
  107.                                 mergeUpdatableWithExisting(existingJourneys, newJourneyLiveEtas);
  108.  
  109.                                 currentJourneyLiveCache.replace(dateEntry, extractMapValues(existingJourneys));
  110.                             } else {
  111.                                 currentJourneyLiveCache.put(dateEntry, newJourneyLiveEtas);
  112.                             }
  113.                         }
  114.                     }
  115.                 }
  116.                 updateAndCleanJourneyLiveEtaCacheIfRequired(currentJourneyLiveCache, mostRecentJourneyDate);
  117.                 buildBusStopIdToLiveEtaCache();
  118.             } else {
  119.                 LOG.info("initial cache loading... starting.");
  120.                 preformInitialCacheLoad(mostRecentJourneyDate);
  121.                 buildBusStopIdToLiveEtaCache();
  122.                 LOG.info("initial cache loading... done.");
  123.             }
  124.         }
  125.     }
  126.  
  127.     private void refreshLiveEtaDataCache() {
  128.         Map<Date, List<JourneyLiveEta>> oldDateJourneyLiveEtaMap = cacheInMemoryHolder.getCachedJourneyLiveEtas();
  129.  
  130.         Map<Date, List<JourneyLiveEta>> dateJourneyLiveEtaMap = null;
  131.  
  132.         if (cacheAlreadyInitialized(oldDateJourneyLiveEtaMap) && numberOfLastJourneyDatesToReload != null) {
  133.             List<Date> dates = journeyLiveEtaManager.findAllDates();
  134.             LOG.debug("journeyLiveEta dates.size={}", dates.size());
  135.             if (dates.size() > 0) {
  136.                 final Date startDate = dates.get(Math.max(0, dates.size() - numberOfLastJourneyDatesToReload));
  137.                 Date endDate = dates.get(dates.size() - 1);
  138.  
  139.                 List<LiveEta> liveEtas = liveEtaManager.findByDateRange(startDate, endDate);
  140.                 LOG.debug("liveEtas.size={} for range startDate={}, endDate={}", liveEtas.size(),
  141.                         FormatHelper.timestampToStringLog(startDate), FormatHelper.timestampToStringLog(endDate));
  142.  
  143.                 dateJourneyLiveEtaMap = oldDateJourneyLiveEtaMap.entrySet()
  144.                         .stream()
  145.                         // filter dates before start date (because we going to
  146.                         // reload live eta [startDate; endDate]
  147.                         .filter(e -> e.getKey()
  148.                                 .before(startDate))
  149.                         // filter dates already deleted from db
  150.                         .filter(filterDatesAlreadyDeletedFromDb(dates))
  151.                         .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
  152.  
  153.                 Map<Date, List<JourneyLiveEta>> dateRangeJourneyLiveEtaMap =
  154.                         JourneyLiveEtaUtils.groupLiveEtaByJourneyAndDate(liveEtas);
  155.  
  156.                 // update old map data with reloaded journeys
  157.                 dateJourneyLiveEtaMap.putAll(dateRangeJourneyLiveEtaMap);
  158.             } else {
  159.                 dateJourneyLiveEtaMap = new HashMap<>();
  160.             }
  161.         } else {
  162.             List<LiveEta> liveEtas = liveEtaManager.findAllLazy();
  163.             LOG.debug("liveEtas.size={}", liveEtas.size());
  164.  
  165.             dateJourneyLiveEtaMap = JourneyLiveEtaUtils.groupLiveEtaByJourneyAndDate(liveEtas);
  166.  
  167.             LOG.debug("dateJourneyLiveEtaMap.size={}", dateJourneyLiveEtaMap.size());
  168.         }
  169.  
  170.         Map<String, List<LiveEta>> tmpCacheLiveEtasByBusStops = dateJourneyLiveEtaMap.values()
  171.                 .stream()
  172.                 .flatMap(ds -> ds.stream())
  173.                 .flatMap(jle -> jle.getStopLiveEta()
  174.                         .stream())
  175.                 .collect(Collectors.groupingBy(le -> le.getBusStopId()));
  176.         LOG.debug("cacheLiveEtasByBusStops.size={}", tmpCacheLiveEtasByBusStops.size());
  177.  
  178.         cacheInMemoryHolder.setCachedLiveEtasByBusStops(tmpCacheLiveEtasByBusStops);
  179.         cacheInMemoryHolder.setCachedJourneyLiveEtas(dateJourneyLiveEtaMap);
  180.     }
  181.  
  182.     private void mergeUpdatableWithExisting(final Map<Integer, JourneyLiveEta> existingJourneys,
  183.             final List<JourneyLiveEta> newJourneyLiveEtas) {
  184.         newJourneyLiveEtas.stream()
  185.                 .forEach(journeyLiveEta -> {
  186.                     Integer dbId = journeyLiveEta.getId();
  187.  
  188.                     JourneyLiveEta existingJourneyLiveEta = existingJourneys.get(dbId);
  189.                     if (existingJourneyLiveEta != null) {
  190.  
  191.                         List<LiveEta> stopLiveEtas = existingJourneyLiveEta.getStopLiveEta();
  192.                         if (stopLiveEtas != null && stopLiveEtas.size() > 0) {
  193.                             Map<Integer, LiveEta> stopLiveEtasMap = mapToDbIds(stopLiveEtas);
  194.  
  195.                             List<LiveEta> newStopLiveEtas = journeyLiveEta.getStopLiveEta();
  196.                             if (newStopLiveEtas != null) {
  197.                                 for (int i = 0; i < newStopLiveEtas.size(); i++) {
  198.                                     LiveEta liveEta = newStopLiveEtas.get(i);
  199.                                     // merge the new with the old.
  200.                                     stopLiveEtasMap.put(liveEta.getId(), liveEta);
  201.                                 }
  202.  
  203.                                 List<LiveEta> updatedLiveEtas = extractLiveEtaMapValues(stopLiveEtasMap);
  204.                                 updatedLiveEtas.sort(JourneyLiveEta.JOURNEY_LIVE_ETA_STOPS_COMPARATOR);
  205.  
  206.                                 // set each journeyLiveEtas reference to point
  207.                                 // to the new obj.
  208.                                 updatedLiveEtas.stream()
  209.                                         .forEach(liveEta -> liveEta.setJourneyLiveEta(journeyLiveEta));
  210.  
  211.                                 journeyLiveEta.setStopLiveEta(updatedLiveEtas);
  212.  
  213.                                 // clean references to make sure no objects are
  214.                                 // left pointing to the new live etas.
  215.                                 cleanReferences(stopLiveEtas, stopLiveEtasMap, existingJourneyLiveEta);
  216.                             }
  217.                         }
  218.                     }
  219.  
  220.                     existingJourneys.put(dbId, journeyLiveEta);
  221.                 });
  222.  
  223.     }
  224.  
  225.     private List<LiveEta> extractLiveEtaMapValues(final Map<Integer, LiveEta> stopLiveEtasMap) {
  226.         return stopLiveEtasMap.values()
  227.                 .stream()
  228.                 .collect(Collectors.toList());
  229.     }
  230.  
  231.     private Map<Integer, LiveEta> mapToDbIds(final List<LiveEta> stopLiveEtas) {
  232.         return stopLiveEtas.stream()
  233.                 .collect(Collectors.toMap(le -> le.getId(), le -> le));
  234.     }
  235.  
  236.     private List<JourneyLiveEta> extractMapValues(final Map<Integer, JourneyLiveEta> existingJourneyLiveEtaMap) {
  237.         return existingJourneyLiveEtaMap.values()
  238.                 .stream()
  239.                 .collect(Collectors.toList());
  240.     }
  241.  
  242.     private Map<Integer, JourneyLiveEta> mapToDBId(final List<JourneyLiveEta> existingJourneyLiveEtas) {
  243.         return existingJourneyLiveEtas.stream()
  244.                 .collect(Collectors.toMap(jle -> jle.getId(), jle -> jle));
  245.     }
  246.  
  247.     private Predicate<? super Entry<Date, List<JourneyLiveEta>>>
  248.             filterDatesAlreadyDeletedFromDb(final List<Date> dates) {
  249.         return e -> dates.stream()
  250.                 .filter(d -> d.equals(e.getKey()))
  251.                 .findAny()
  252.                 .isPresent();
  253.     }
  254.  
  255.     private Map<String, List<LiveEta>>
  256.             groupLiveEtasByBusStopId(final Map<Date, List<JourneyLiveEta>> dateJourneyLiveEtaMap) {
  257.         return dateJourneyLiveEtaMap.values()
  258.                 .stream()
  259.                 .flatMap(ds -> ds.stream())
  260.                 .flatMap(jle -> jle.getStopLiveEta()
  261.                         .stream())
  262.                 .collect(Collectors.groupingBy(le -> le.getBusStopId()));
  263.  
  264.     }
  265.  
  266.     private void cleanReferences(final List<LiveEta> stopLiveEtas, final Map<Integer, LiveEta> stopLiveEtasMap,
  267.             final JourneyLiveEta existingJourneyLiveEta) {
  268.         stopLiveEtas.clear();
  269.         stopLiveEtasMap.clear();
  270.         existingJourneyLiveEta.getStopLiveEta()
  271.                 .clear();
  272.     }
  273.  
  274.     private void buildBusStopIdToLiveEtaCache() {
  275.         // use the journeyLiveEtas after the cacheCleanup.
  276.         Map<Date, List<JourneyLiveEta>> currentJourneyLiveCache = cacheInMemoryHolder.getCachedJourneyLiveEtas();
  277.         Map<String, List<LiveEta>> tmpCacheLiveEtasByBusStops = groupLiveEtasByBusStopId(currentJourneyLiveCache);
  278.         LOG.debug("cacheLiveEtasByBusStops.size={}", tmpCacheLiveEtasByBusStops.size());
  279.         cacheInMemoryHolder.setCachedLiveEtasByBusStops(tmpCacheLiveEtasByBusStops);
  280.  
  281.     }
  282.  
  283.     private void updateAndCleanJourneyLiveEtaCacheIfRequired(
  284.             final Map<Date, List<JourneyLiveEta>> dateJourneyLiveEtaMap, final Date date) {
  285.         LOG.debug("dateJourneyLiveEtaMap.size={}", dateJourneyLiveEtaMap.size());
  286.         cacheInMemoryHolder.setCachedJourneyLiveEtas(dateJourneyLiveEtaMap);
  287.         if (liveEtaCacheCleaner.liveEtaCacheRequiresCleanup()) {
  288.             liveEtaCacheCleaner.cleanDatesFromCacheExcept(date);
  289.         }
  290.     }
  291.  
  292.     private void preformInitialCacheLoad(final Date initialLoadDate) {
  293.         List<LiveEta> liveEtas = liveEtaManager.findByDate(initialLoadDate);
  294.         LOG.info("liveEtas.size={}", liveEtas.size());
  295.         if (liveEtas.size() > 0) {
  296.             Map<Date, List<JourneyLiveEta>> initiallyLoadedJourneyLiveEtas =
  297.                     JourneyLiveEtaUtils.groupLiveEtaByJourneyAndDate(liveEtas);
  298.  
  299.             updateAndCleanJourneyLiveEtaCacheIfRequired(initiallyLoadedJourneyLiveEtas, initialLoadDate);
  300.         }
  301.     }
  302.  
  303.     private boolean cacheAlreadyInitialized(final Map<Date, List<JourneyLiveEta>> currentJourneyLiveCache) {
  304.         return currentJourneyLiveCache != null && currentJourneyLiveCache.size() > 0;
  305.     }
  306.  
  307.     public void setLiveEtaManager(final ILiveEtaManager liveEtaManager) {
  308.         this.liveEtaManager = liveEtaManager;
  309.     }
  310.  
  311.     public void setCacheInMemoryHolder(final CacheInMemoryHolder cacheInMemoryHolder) {
  312.         this.cacheInMemoryHolder = cacheInMemoryHolder;
  313.     }
  314.  
  315.     public void setJourneyLiveEtaManager(final IJourneyLiveEtaManager journeyLiveEtaManager) {
  316.         this.journeyLiveEtaManager = journeyLiveEtaManager;
  317.     }
  318.  
  319.     public void setNumberOfLastJourneyDatesToReload(final Integer numberOfLastJourneyDatesToReload) {
  320.         this.numberOfLastJourneyDatesToReload = numberOfLastJourneyDatesToReload;
  321.     }
  322.  
  323.     public void setLastCacheRefreshTime(final Date lastCacheRefreshTime) {
  324.         this.lastCacheRefreshTime = lastCacheRefreshTime;
  325.     }
  326. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement