Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.utrack.extservice.cache;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Map.Entry;
- import java.util.function.Predicate;
- import java.util.stream.Collectors;
- import org.apache.commons.lang3.time.DateUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import com.utrack.db.eta.logic.IJourneyLiveEtaManager;
- import com.utrack.db.eta.logic.ILiveEtaManager;
- import com.utrack.db.eta.model.JourneyLiveEta;
- import com.utrack.db.eta.model.LiveEta;
- import com.utrack.util.FormatHelper;
- import com.utrack.util.JourneyLiveEtaUtils;
- import com.utrack.util.TimeHelper;
- /**
- * LiveEtaCacheCreator for in memory custom cache
- * <p>
- * $LastChangedDate$<br>
- * $LastChangedBy$<br>
- * $Revision$<br>
- * </p>
- *
- * @author Justas
- */
- public class LiveEtaCacheCreator {
- private static final Logger LOG = LoggerFactory.getLogger(LiveEtaCacheCreator.class);
- private ILiveEtaManager liveEtaManager;
- private IJourneyLiveEtaManager journeyLiveEtaManager;
- private CacheInMemoryHolder cacheInMemoryHolder;
- @Autowired
- private LiveEtaCacheCleaner liveEtaCacheCleaner;
- /**
- * Number of last JounreyLiveEta dates to reload. As example if property
- * value 3 and in DB we have journey live eta for dates
- * 2017-10-01,2017-10-02, 2017-10-09, 2017-10-11 cache updater will reload
- * only 2017-10-02, 2017-10-09, 2017-10-11 dates
- */
- private Integer numberOfLastJourneyDatesToReload = null;
- private Date lastCacheRefreshTime = null;
- public void init() {
- LOG.info("numberOfJourneyDatesToReload={}", numberOfLastJourneyDatesToReload);
- }
- public void updateLiveEtaCache() {
- LOG.info("updateLiveEtaCache start");
- long startTime = System.currentTimeMillis();
- if (numberOfLastJourneyDatesToReload != null) {
- refreshLiveEtaDataCache();
- } else {
- refreshByCacheTimestamp();
- }
- if (LOG.isInfoEnabled()) {
- long duration = System.currentTimeMillis() - startTime;
- String durationStr = TimeHelper.durationToString(duration);
- LOG.info("updateLiveEtaCache updated. [{} ms] [{}]", duration, durationStr);
- }
- // Every time the cache is refresh a new date will be set -5 Minutes, so
- // we make sure to get all etas.
- lastCacheRefreshTime = DateUtils.addMinutes(new Date(), -5);
- }
- private void refreshByCacheTimestamp() {
- Map<Date, List<JourneyLiveEta>> currentJourneyLiveCache = cacheInMemoryHolder.getCachedJourneyLiveEtas();
- List<Date> dbDates = journeyLiveEtaManager.findAllDates();
- LOG.info("journeyLiveEtaDates.size={}", dbDates.size());
- if (dbDates.size() > 0) {
- Date mostRecentJourneyDate = dbDates.get(dbDates.size() - 1);
- if (cacheAlreadyInitialized(currentJourneyLiveCache)) {
- if (dbDates.size() > 0) {
- List<LiveEta> liveEtas = liveEtaManager.findByLastModifiedTimestampAfter(lastCacheRefreshTime);
- LOG.info("liveEtas.size={}", liveEtas.size());
- if (liveEtas.size() > 0) {
- Map<Date, List<JourneyLiveEta>> newJourneyLiveEtasMappedOnDate =
- JourneyLiveEtaUtils.groupLiveEtaByJourneyAndDate(liveEtas);
- for (Map.Entry<Date, List<JourneyLiveEta>> entry : newJourneyLiveEtasMappedOnDate.entrySet()) {
- Date dateEntry = entry.getKey();
- List<JourneyLiveEta> newJourneyLiveEtas = entry.getValue();
- List<JourneyLiveEta> existingJourneyLiveEtas = currentJourneyLiveCache.get(dateEntry);
- if (existingJourneyLiveEtas != null) {
- Map<Integer, JourneyLiveEta> existingJourneys = mapToDBId(existingJourneyLiveEtas);
- mergeUpdatableWithExisting(existingJourneys, newJourneyLiveEtas);
- currentJourneyLiveCache.replace(dateEntry, extractMapValues(existingJourneys));
- } else {
- currentJourneyLiveCache.put(dateEntry, newJourneyLiveEtas);
- }
- }
- }
- }
- updateAndCleanJourneyLiveEtaCacheIfRequired(currentJourneyLiveCache, mostRecentJourneyDate);
- buildBusStopIdToLiveEtaCache();
- } else {
- LOG.info("initial cache loading... starting.");
- preformInitialCacheLoad(mostRecentJourneyDate);
- buildBusStopIdToLiveEtaCache();
- LOG.info("initial cache loading... done.");
- }
- }
- }
- private void refreshLiveEtaDataCache() {
- Map<Date, List<JourneyLiveEta>> oldDateJourneyLiveEtaMap = cacheInMemoryHolder.getCachedJourneyLiveEtas();
- Map<Date, List<JourneyLiveEta>> dateJourneyLiveEtaMap = null;
- if (cacheAlreadyInitialized(oldDateJourneyLiveEtaMap) && numberOfLastJourneyDatesToReload != null) {
- List<Date> dates = journeyLiveEtaManager.findAllDates();
- LOG.debug("journeyLiveEta dates.size={}", dates.size());
- if (dates.size() > 0) {
- final Date startDate = dates.get(Math.max(0, dates.size() - numberOfLastJourneyDatesToReload));
- Date endDate = dates.get(dates.size() - 1);
- List<LiveEta> liveEtas = liveEtaManager.findByDateRange(startDate, endDate);
- LOG.debug("liveEtas.size={} for range startDate={}, endDate={}", liveEtas.size(),
- FormatHelper.timestampToStringLog(startDate), FormatHelper.timestampToStringLog(endDate));
- dateJourneyLiveEtaMap = oldDateJourneyLiveEtaMap.entrySet()
- .stream()
- // filter dates before start date (because we going to
- // reload live eta [startDate; endDate]
- .filter(e -> e.getKey()
- .before(startDate))
- // filter dates already deleted from db
- .filter(filterDatesAlreadyDeletedFromDb(dates))
- .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
- Map<Date, List<JourneyLiveEta>> dateRangeJourneyLiveEtaMap =
- JourneyLiveEtaUtils.groupLiveEtaByJourneyAndDate(liveEtas);
- // update old map data with reloaded journeys
- dateJourneyLiveEtaMap.putAll(dateRangeJourneyLiveEtaMap);
- } else {
- dateJourneyLiveEtaMap = new HashMap<>();
- }
- } else {
- List<LiveEta> liveEtas = liveEtaManager.findAllLazy();
- LOG.debug("liveEtas.size={}", liveEtas.size());
- dateJourneyLiveEtaMap = JourneyLiveEtaUtils.groupLiveEtaByJourneyAndDate(liveEtas);
- LOG.debug("dateJourneyLiveEtaMap.size={}", dateJourneyLiveEtaMap.size());
- }
- Map<String, List<LiveEta>> tmpCacheLiveEtasByBusStops = dateJourneyLiveEtaMap.values()
- .stream()
- .flatMap(ds -> ds.stream())
- .flatMap(jle -> jle.getStopLiveEta()
- .stream())
- .collect(Collectors.groupingBy(le -> le.getBusStopId()));
- LOG.debug("cacheLiveEtasByBusStops.size={}", tmpCacheLiveEtasByBusStops.size());
- cacheInMemoryHolder.setCachedLiveEtasByBusStops(tmpCacheLiveEtasByBusStops);
- cacheInMemoryHolder.setCachedJourneyLiveEtas(dateJourneyLiveEtaMap);
- }
- private void mergeUpdatableWithExisting(final Map<Integer, JourneyLiveEta> existingJourneys,
- final List<JourneyLiveEta> newJourneyLiveEtas) {
- newJourneyLiveEtas.stream()
- .forEach(journeyLiveEta -> {
- Integer dbId = journeyLiveEta.getId();
- JourneyLiveEta existingJourneyLiveEta = existingJourneys.get(dbId);
- if (existingJourneyLiveEta != null) {
- List<LiveEta> stopLiveEtas = existingJourneyLiveEta.getStopLiveEta();
- if (stopLiveEtas != null && stopLiveEtas.size() > 0) {
- Map<Integer, LiveEta> stopLiveEtasMap = mapToDbIds(stopLiveEtas);
- List<LiveEta> newStopLiveEtas = journeyLiveEta.getStopLiveEta();
- if (newStopLiveEtas != null) {
- for (int i = 0; i < newStopLiveEtas.size(); i++) {
- LiveEta liveEta = newStopLiveEtas.get(i);
- // merge the new with the old.
- stopLiveEtasMap.put(liveEta.getId(), liveEta);
- }
- List<LiveEta> updatedLiveEtas = extractLiveEtaMapValues(stopLiveEtasMap);
- updatedLiveEtas.sort(JourneyLiveEta.JOURNEY_LIVE_ETA_STOPS_COMPARATOR);
- // set each journeyLiveEtas reference to point
- // to the new obj.
- updatedLiveEtas.stream()
- .forEach(liveEta -> liveEta.setJourneyLiveEta(journeyLiveEta));
- journeyLiveEta.setStopLiveEta(updatedLiveEtas);
- // clean references to make sure no objects are
- // left pointing to the new live etas.
- cleanReferences(stopLiveEtas, stopLiveEtasMap, existingJourneyLiveEta);
- }
- }
- }
- existingJourneys.put(dbId, journeyLiveEta);
- });
- }
- private List<LiveEta> extractLiveEtaMapValues(final Map<Integer, LiveEta> stopLiveEtasMap) {
- return stopLiveEtasMap.values()
- .stream()
- .collect(Collectors.toList());
- }
- private Map<Integer, LiveEta> mapToDbIds(final List<LiveEta> stopLiveEtas) {
- return stopLiveEtas.stream()
- .collect(Collectors.toMap(le -> le.getId(), le -> le));
- }
- private List<JourneyLiveEta> extractMapValues(final Map<Integer, JourneyLiveEta> existingJourneyLiveEtaMap) {
- return existingJourneyLiveEtaMap.values()
- .stream()
- .collect(Collectors.toList());
- }
- private Map<Integer, JourneyLiveEta> mapToDBId(final List<JourneyLiveEta> existingJourneyLiveEtas) {
- return existingJourneyLiveEtas.stream()
- .collect(Collectors.toMap(jle -> jle.getId(), jle -> jle));
- }
- private Predicate<? super Entry<Date, List<JourneyLiveEta>>>
- filterDatesAlreadyDeletedFromDb(final List<Date> dates) {
- return e -> dates.stream()
- .filter(d -> d.equals(e.getKey()))
- .findAny()
- .isPresent();
- }
- private Map<String, List<LiveEta>>
- groupLiveEtasByBusStopId(final Map<Date, List<JourneyLiveEta>> dateJourneyLiveEtaMap) {
- return dateJourneyLiveEtaMap.values()
- .stream()
- .flatMap(ds -> ds.stream())
- .flatMap(jle -> jle.getStopLiveEta()
- .stream())
- .collect(Collectors.groupingBy(le -> le.getBusStopId()));
- }
- private void cleanReferences(final List<LiveEta> stopLiveEtas, final Map<Integer, LiveEta> stopLiveEtasMap,
- final JourneyLiveEta existingJourneyLiveEta) {
- stopLiveEtas.clear();
- stopLiveEtasMap.clear();
- existingJourneyLiveEta.getStopLiveEta()
- .clear();
- }
- private void buildBusStopIdToLiveEtaCache() {
- // use the journeyLiveEtas after the cacheCleanup.
- Map<Date, List<JourneyLiveEta>> currentJourneyLiveCache = cacheInMemoryHolder.getCachedJourneyLiveEtas();
- Map<String, List<LiveEta>> tmpCacheLiveEtasByBusStops = groupLiveEtasByBusStopId(currentJourneyLiveCache);
- LOG.debug("cacheLiveEtasByBusStops.size={}", tmpCacheLiveEtasByBusStops.size());
- cacheInMemoryHolder.setCachedLiveEtasByBusStops(tmpCacheLiveEtasByBusStops);
- }
- private void updateAndCleanJourneyLiveEtaCacheIfRequired(
- final Map<Date, List<JourneyLiveEta>> dateJourneyLiveEtaMap, final Date date) {
- LOG.debug("dateJourneyLiveEtaMap.size={}", dateJourneyLiveEtaMap.size());
- cacheInMemoryHolder.setCachedJourneyLiveEtas(dateJourneyLiveEtaMap);
- if (liveEtaCacheCleaner.liveEtaCacheRequiresCleanup()) {
- liveEtaCacheCleaner.cleanDatesFromCacheExcept(date);
- }
- }
- private void preformInitialCacheLoad(final Date initialLoadDate) {
- List<LiveEta> liveEtas = liveEtaManager.findByDate(initialLoadDate);
- LOG.info("liveEtas.size={}", liveEtas.size());
- if (liveEtas.size() > 0) {
- Map<Date, List<JourneyLiveEta>> initiallyLoadedJourneyLiveEtas =
- JourneyLiveEtaUtils.groupLiveEtaByJourneyAndDate(liveEtas);
- updateAndCleanJourneyLiveEtaCacheIfRequired(initiallyLoadedJourneyLiveEtas, initialLoadDate);
- }
- }
- private boolean cacheAlreadyInitialized(final Map<Date, List<JourneyLiveEta>> currentJourneyLiveCache) {
- return currentJourneyLiveCache != null && currentJourneyLiveCache.size() > 0;
- }
- public void setLiveEtaManager(final ILiveEtaManager liveEtaManager) {
- this.liveEtaManager = liveEtaManager;
- }
- public void setCacheInMemoryHolder(final CacheInMemoryHolder cacheInMemoryHolder) {
- this.cacheInMemoryHolder = cacheInMemoryHolder;
- }
- public void setJourneyLiveEtaManager(final IJourneyLiveEtaManager journeyLiveEtaManager) {
- this.journeyLiveEtaManager = journeyLiveEtaManager;
- }
- public void setNumberOfLastJourneyDatesToReload(final Integer numberOfLastJourneyDatesToReload) {
- this.numberOfLastJourneyDatesToReload = numberOfLastJourneyDatesToReload;
- }
- public void setLastCacheRefreshTime(final Date lastCacheRefreshTime) {
- this.lastCacheRefreshTime = lastCacheRefreshTime;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement