Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.xxx.service;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Map;
- import java.util.TreeSet;
- import java.util.concurrent.atomic.AtomicBoolean;
- import org.apache.ignite.Ignite;
- import org.apache.ignite.IgniteCache;
- import org.apache.ignite.IgniteCompute;
- import org.apache.ignite.Ignition;
- import org.apache.ignite.cache.CacheAtomicityMode;
- import org.apache.ignite.cache.CacheMemoryMode;
- import org.apache.ignite.cache.CacheMetrics;
- import org.apache.ignite.cache.CacheMode;
- import org.apache.ignite.cache.CachePeekMode;
- import org.apache.ignite.configuration.CacheConfiguration;
- import org.apache.ignite.configuration.IgniteConfiguration;
- import org.apache.ignite.internal.processors.cache.CacheInvokeResult;
- import org.apache.ignite.lang.IgniteFuture;
- import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
- import io.vertx.core.AbstractVerticle;
- import io.vertx.core.Context;
- import io.vertx.core.Future;
- import io.vertx.core.eventbus.MessageConsumer;
- import io.vertx.core.json.JsonArray;
- import io.vertx.core.json.JsonObject;
- public class Service extends AbstractVerticle {
- public static final String ADDRESS = "com.xxx.service.my";
- public static final String CACHE_NAME = "cache";
- private static AtomicBoolean cacheCreated = new AtomicBoolean(false);
- private static AtomicBoolean cacheStopped = new AtomicBoolean(false);
- private static Ignite ignite = null;
- private static IgniteCache<String, HashMap<String, HashSet<String>>> cache = null;
- public void start(Future<Void> startFuture) {
- // Make sure we init the grid only once.
- synchronized (cacheCreated) {
- if(cacheCreated.compareAndSet(false, true))
- {
- try {
- System.out.println("Creating key/value off-heap cache...");
- IgniteConfiguration igniteCfg = new IgniteConfiguration();
- igniteCfg.setMarshaller(new OptimizedMarshaller(true));
- CacheConfiguration<String, HashMap<String, HashSet<String>>> myCfg = new CacheConfiguration<>(CACHE_NAME);
- myCfg.setCacheMode(CacheMode.PARTITIONED);
- myCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
- myCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
- myCfg.setOffHeapMaxMemory(96 * 1024L * 1024L * 1024L);
- myCfg.setStartSize(50 * 1024 * 1024);
- myCfg.setBackups(0);
- ignite = Ignition.start(igniteCfg);
- cache = ignite.getOrCreateCache(myCfg).withAsync();
- vertx.setPeriodic(1000, timer -> {
- cache.size(CachePeekMode.ALL);
- IgniteFuture<Integer> fut = cache.future();
- fut.listen(f->{
- long offHeapSizeB = cache.metrics().getOffHeapAllocatedSize();
- long offHeapSizeGb = 0;
- if(offHeapSizeB > 0)
- offHeapSizeGb = cache.metrics().getOffHeapAllocatedSize() / 1024 / 1024 / 1024;
- System.out.println("Total: " + f.get() + " Local: " + cache.localSize(CachePeekMode.ALL) + " Off-Heap: " + offHeapSizeGb + "GB");
- });
- });
- startFuture.complete();
- } catch(Exception ex) {
- startFuture.fail(ex);
- }
- }
- }
- // This is called per HTTP POST request.
- MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer(ADDRESS);
- consumer.handler(rh -> {
- try {
- final JsonObject request = rh.body();
- int field1 = request.getInteger("f1").intValue();
- int field2 = request.getInteger("f2").intValue();
- final String field3 = request.getString("f3") + request.getString("f4");
- final String field4 = request.getString("f5");
- final String field5 = request.getString("f6");
- final String field6 = request.getString("f7") + request.getString("f8") + request.getString("f9") + request.getString("f10") + request.getString("f11") + request.getString("f12");
- final String field7 = request.getString("f13");
- final String field8 = request.getString("f14");
- final String field9 = request.getLong("f15").toString();
- final String field10 = request.getString("f16");
- final String field11 = request.getString("f17") + request.getString("f18") + request.getString("f19") + request.getString("f20") + request.getString("f21") + request.getString("f22");
- final Context ctx = vertx.getOrCreateContext();
- TreeSet<String> keys = new TreeSet<String>();
- keys.add(field3);
- keys.add(field3 + field1);
- keys.add(field3 + field2);
- keys.add(field4);
- keys.add(field4 + field1);
- keys.add(field4 + field2);
- keys.add(field5);
- keys.add(field5 + field1);
- keys.add(field5 + field2);
- keys.add(field6);
- keys.add(field6 + field1);
- keys.add(field6 + field2);
- keys.add(field7);
- keys.add(field7 + field1);
- keys.add(field7 + field2);
- keys.add(field8);
- keys.add(field8 + field1);
- keys.add(field8 + field2);
- keys.add(field9);
- keys.add(field9 + field1);
- keys.add(field9 + field2);
- keys.add(field10);
- keys.add(field10 + field1);
- keys.add(field10 + field2);
- keys.add(field11);
- keys.add(field11 + field1);
- keys.add(field11 + field2);
- cache.<String>invokeAll(keys, (entry, args) -> {
- HashMap<String, HashSet<String>> params = null;
- String mykey = entry.getKey();
- if(!entry.exists()){
- params = new HashMap<String, HashSet<String>>();
- params.put("field3", newHashSet(field3));
- params.put("field4", newHashSet(field4));
- params.put("field5", newHashSet(field5));
- params.put("field6", newHashSet(field6));
- params.put("field7", newHashSet(field7));
- params.put("field8", newHashSet(field8));
- params.put("field9", newHashSet(field9));
- params.put("field10", newHashSet(field10));
- params.put("field11", newHashSet(field11));
- cache.put(mykey, params);
- } else {
- params = entry.getValue();
- params.get("field3").add(field3);
- params.get("field4").add(field4);
- params.get("field5").add(field5);
- params.get("field6").add(field6);
- params.get("field7").add(field7);
- params.get("field8").add(field8);
- params.get("field9").add(field9);
- params.get("field10").add(field10);
- params.get("field11").add(field11);
- entry.setValue(params);
- }
- String counts =
- + params.get("field3").size() + ","
- + params.get("field4").size() + ","
- + params.get("field5").size() + ","
- + params.get("field6").size() + ","
- + params.get("field7").size() + ","
- + params.get("field8").size() + ","
- + params.get("field9").size() + ","
- + params.get("field10").size() + ","
- + params.get("field11").size()
- ;
- return counts;
- });
- cache.<Map<String,CacheInvokeResult<String>>>future().listen(ia -> {
- final JsonArray countss = new JsonArray();
- ia.get().forEach((k,v)->{
- countss.add(new JsonObject().put(k, v.get()));
- });
- ctx.runOnContext(ch -> {
- rh.reply(new JsonObject().put("counts", countss));
- });
- });
- } catch(Exception ex) {
- ex.printStackTrace();
- rh.reply(ex.toString());
- }
- });
- }
- public void stop() {
- synchronized (cacheStopped) {
- if(cacheStopped.compareAndSet(false, true))
- {
- System.out.println("Stopping cache...");
- Ignition.stop(true);
- }
- }
- }
- public static HashSet<String> newHashSet(String... strings) {
- HashSet<String> set = new HashSet<String>();
- for (String s : strings) {
- set.add(s);
- }
- return set;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement