Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.company;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.TimeUnit;
- import java.util.function.LongToIntFunction;
- import java.util.function.LongUnaryOperator;
- public class Aggregation {
- private static final String CASSANDRA_SUMMARY = "summary";
- private static final String CASSANDRA_SUMMARY_1H = "summary_1h";
- private static final String CASSANDRA_SUMMARY_1D = "summary_1d";
- private static final String CASSANDRA_SUMMARY_1W = "summary_1w";
- private static final String CASSANDRA_TIME = "time";
- private static final String CASSANDRA_TIME_1H = "time_1h";
- private static final String CASSANDRA_TIME_1D = "time_1d";
- private static final String CASSANDRA_TIME_1W = "time_1w";
- private static final String CASSANDRA_HISTORY = "history";
- private static final String CASSANDRA_KEY = "key";
- private static final Map<String, String> CASSANDRA_SUMMARIES = new HashMap<>() {{
- put("1h", CASSANDRA_SUMMARY_1H);
- put("1d", CASSANDRA_SUMMARY_1D);
- put("1w", CASSANDRA_SUMMARY_1W);
- }};
- private static final Map<String, String> CASSANDRA_TIMES = new HashMap<>() {{
- put("1h", CASSANDRA_TIME_1H);
- put("1d", CASSANDRA_TIME_1D);
- put("1w", CASSANDRA_TIME_1W);
- }};
- //replacement of time1h, time1d, time1w
- private final Map<String, Long> TIME1 = new HashMap<String, Long>() {{
- put("1h", 0L);
- put("1d", 0L);
- put("1w", 0L); }};
- private final Map<String, Int2ObjectOpenHashMap<SummaryBase>> summary1Map = new HashMap<>() {{
- put("1h", new Int2ObjectHashMap<>());
- put("1d", new Int2ObjectHashMap<>());
- put("1w", new Int2ObjectHashMap<>());
- }};
- private final Map<String, HistoryBase> history1 = new HashMap<>();
- private static final Map<String, LongToIntFunction> TIMES = new HashMap<>();
- static {
- TIMES.put("summary", SummaryBase::getKey15min);
- TIMES.put("1h", SummaryBase::getKey1h);
- TIMES.put("1d", SummaryBase::getKey1d);
- TIMES.put("1w", SummaryBase::getKey1w);
- }
- private static final Map<String, String> TABS = new HashMap<>();
- static {
- TABS.put("summary", "time");
- TABS.put("summary_1h", "time_1h");
- TABS.put("summary_1d", "time_1d");
- TABS.put("summary_1w", "time_1w");
- }
- private static final Map<String, Integer> INTERVALS = new HashMap<>();
- static {
- //INTERVALS.put("summary", (int) TimeUnit.MINUTES.toMillis(15));
- INTERVALS.put("1h", (int) TimeUnit.HOURS.toMillis(1));
- INTERVALS.put("1d", (int) TimeUnit.DAYS.toMillis(1));
- INTERVALS.put("1w", (int) TimeUnit.DAYS.toMillis(7));
- }
- public static final Map<String, LongUnaryOperator> TIME1X = new HashMap<>();
- static {
- TIME1X.put("summary", (value) -> (value + TimeUnit.SECONDS.toMillis(449)) / TimeUnit.MINUTES.toMillis(15) * TimeUnit.MINUTES.toMillis(15));
- TIME1X.put("summary_1h", (value) -> (value + TimeUnit.HOURS.toMillis(1) - 1) / TimeUnit.HOURS.toMillis(1) * TimeUnit.HOURS.toMillis(1));
- TIME1X.put("summary_1d", (value) -> ((value + TimeUnit.HOURS.toMillis(3) + TimeUnit.DAYS.toMillis(1) - 1) / TimeUnit.DAYS.toMillis(1) * TimeUnit.DAYS.toMillis(1)) - TimeUnit.HOURS.toMillis(3));
- TIME1X.put("summary_1w", (value) -> ((value + TimeUnit.HOURS.toMillis(3) + TimeUnit.DAYS.toMillis(3) - 1) / TimeUnit.DAYS.toMillis(7) * TimeUnit.DAYS.toMillis(7)) + TimeUnit.DAYS.toMillis(4) - TimeUnit.HOURS.toMillis(3));
- }
- private void cassandraInsert(String aggr, long time, Int2ObjectMap<SummaryBase> summaryMap, HistoryBase history) {
- final long time15min = (time + TimeUnit.SECONDS.toMillis(449)) / TimeUnit.MINUTES.toMillis(15) * TimeUnit.MINUTES.toMillis(15);
- if (this.TIME1.get(aggr) == 0) {
- long _time1 = TIME1X.get(aggr).applyAsLong(time15min);
- summary1Map.get(aggr).clear();
- history1.put(aggr, cassandraHistoryModeller.newInstance());
- history1.get(aggr).setTable(Utils.stringToBytesASCII(CASSANDRA_SUMMARIES.get(aggr)));
- history1.get(aggr).setStart(_time1 - INTERVALS.get(aggr));
- history1.get(aggr).setStop(_time1);
- cassandraSelect(time15min, _time1, this.summary1Map, this.history1);
- this.TIME1.put(aggr, TIME1.get(aggr));
- }
- if (time15min < TIME1.get(aggr)) {
- LOG.info("Aggregation time = {} ==> hour = {}", ToStringHelper.millsToString(time), ToStringHelper.millsToString(time1h));
- doAppend(summaryMap, time15min, history, time1h - TimeUnit.HOURS.toMillis(1), TIME1.get(aggr), summary1Map.get(aggr), SummaryBase.getKey1h(time1h), history1h);
- }
- else if (time15min == time1h) {
- LOG.info("Aggregation time = {} ==> hour = {}", ToStringHelper.millsToString(time), ToStringHelper.millsToString(time1h));
- doAppend(summaryMap, time15min, history, time1h - TimeUnit.HOURS.toMillis(1), time1h, summary1hMap, SummaryBase.getKey1h(time1h), history1h);
- LOG.info("Aggregation insert {} rows for hour = {}", summary1hMap.size(), ToStringHelper.millsToString(time1h));
- if (_1h)
- doInsert(CASSANDRA_SUMMARY_1H, summary1hMap/*, CASSANDRA_TIME_1H, SummaryBase.getKey1h(time1h), time1h - TimeUnit.HOURS.toMillis(1)*/, time1h, history1h);
- time1h += TimeUnit.HOURS.toMillis(1);
- summary1hMap.clear();
- history1h = cassandraHistoryModeller.newInstance();
- history1h.setTable(Utils.stringToBytesASCII(CASSANDRA_SUMMARY_1H));
- history1h.setStart(time1h - TimeUnit.HOURS.toMillis(1));
- history1h.setStop(time1h);
- }
- else {
- LOG.info("MISSED aggregation time = {} >=< hour = {}", ToStringHelper.millsToString(time), ToStringHelper.millsToString(time1h));
- LOG.info("Aggregation insert {} rows for hour = {}", summary1hMap.size(), ToStringHelper.millsToString(time1h));
- if (_1h)
- doInsert(CASSANDRA_SUMMARY_1H, summary1hMap/*, CASSANDRA_TIME_1H, SummaryBase.getKey1h(time1h), time1h - TimeUnit.HOURS.toMillis(1)*/, time1h, history1h);
- time1h = (time15min + TimeUnit.HOURS.toMillis(1) - 1) / TimeUnit.HOURS.toMillis(1) * TimeUnit.HOURS.toMillis(1);
- summary1hMap.clear();
- history1h = cassandraHistoryModeller.newInstance();
- history1h.setTable(Utils.stringToBytesASCII(CASSANDRA_SUMMARY_1H));
- history1h.setStart(time1h - TimeUnit.HOURS.toMillis(1));
- history1h.setStop(time1h);
- doAppend(summaryMap, time15min, history, time1h - TimeUnit.HOURS.toMillis(1), time1h, summary1hMap, SummaryBase.getKey1h(time1h), history1h);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement