Advertisement
Guest User

Ignite

a guest
Sep 1st, 2015
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 7.60 KB | None | 0 0
  1. package com.xxx.service;
  2.  
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.Map;
  6. import java.util.TreeSet;
  7. import java.util.concurrent.atomic.AtomicBoolean;
  8.  
  9. import org.apache.ignite.Ignite;
  10. import org.apache.ignite.IgniteCache;
  11. import org.apache.ignite.IgniteCompute;
  12. import org.apache.ignite.Ignition;
  13. import org.apache.ignite.cache.CacheAtomicityMode;
  14. import org.apache.ignite.cache.CacheMemoryMode;
  15. import org.apache.ignite.cache.CacheMetrics;
  16. import org.apache.ignite.cache.CacheMode;
  17. import org.apache.ignite.cache.CachePeekMode;
  18. import org.apache.ignite.configuration.CacheConfiguration;
  19. import org.apache.ignite.configuration.IgniteConfiguration;
  20. import org.apache.ignite.internal.processors.cache.CacheInvokeResult;
  21. import org.apache.ignite.lang.IgniteFuture;
  22. import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
  23.  
  24. import io.vertx.core.AbstractVerticle;
  25. import io.vertx.core.Context;
  26. import io.vertx.core.Future;
  27. import io.vertx.core.eventbus.MessageConsumer;
  28. import io.vertx.core.json.JsonArray;
  29. import io.vertx.core.json.JsonObject;
  30.  
  31. public class Service extends AbstractVerticle {
  32.  
  33.     public static final String ADDRESS = "com.xxx.service.my";
  34.     public static final String CACHE_NAME = "cache";
  35.    
  36.     private static AtomicBoolean cacheCreated = new AtomicBoolean(false);  
  37.     private static AtomicBoolean cacheStopped = new AtomicBoolean(false);  
  38.     private static Ignite ignite = null;
  39.     private static IgniteCache<String, HashMap<String, HashSet<String>>> cache = null;
  40.  
  41.     public void start(Future<Void> startFuture) {
  42.        
  43.         // Make sure we init the grid only once.
  44.         synchronized (cacheCreated) {
  45.             if(cacheCreated.compareAndSet(false, true))
  46.             {
  47.                         try {
  48.                             System.out.println("Creating key/value off-heap cache...");
  49.  
  50.                             IgniteConfiguration igniteCfg = new IgniteConfiguration();
  51.                             igniteCfg.setMarshaller(new OptimizedMarshaller(true));
  52.  
  53.                             CacheConfiguration<String, HashMap<String, HashSet<String>>> myCfg = new CacheConfiguration<>(CACHE_NAME);
  54.                             myCfg.setCacheMode(CacheMode.PARTITIONED);
  55.                             myCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
  56.                             myCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
  57.                             myCfg.setOffHeapMaxMemory(96 * 1024L * 1024L * 1024L);
  58.                             myCfg.setStartSize(50 * 1024 * 1024);
  59.                             myCfg.setBackups(0);
  60.  
  61.                             ignite = Ignition.start(igniteCfg);                    
  62.                             cache = ignite.getOrCreateCache(myCfg).withAsync();
  63.  
  64.                             vertx.setPeriodic(1000, timer -> {
  65.                                 cache.size(CachePeekMode.ALL);
  66.                                 IgniteFuture<Integer> fut = cache.future();
  67.                                 fut.listen(f->{
  68.                                     long offHeapSizeB = cache.metrics().getOffHeapAllocatedSize();
  69.                                     long offHeapSizeGb = 0;
  70.                                     if(offHeapSizeB > 0)
  71.                                         offHeapSizeGb =  cache.metrics().getOffHeapAllocatedSize() / 1024 / 1024 / 1024;
  72.                                    
  73.                                     System.out.println("Total: " +  f.get() + " Local: " + cache.localSize(CachePeekMode.ALL) + " Off-Heap: " + offHeapSizeGb + "GB");
  74.                                 });
  75.                             });
  76.  
  77.                             startFuture.complete();
  78.                         } catch(Exception ex) {
  79.                             startFuture.fail(ex);
  80.                         }          
  81.             }
  82.         }
  83.  
  84.         // This is called per HTTP POST request.
  85.         MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer(ADDRESS);
  86.         consumer.handler(rh -> {
  87.             try {
  88.                 final JsonObject request = rh.body();
  89.  
  90.                 int field1 = request.getInteger("f1").intValue();
  91.                 int field2 = request.getInteger("f2").intValue();
  92.                 final String field3 = request.getString("f3") + request.getString("f4");
  93.                 final String field4 = request.getString("f5");
  94.                 final String field5 = request.getString("f6");
  95.                 final String field6 = request.getString("f7") + request.getString("f8") + request.getString("f9") + request.getString("f10") + request.getString("f11") + request.getString("f12");
  96.                 final String field7 = request.getString("f13");
  97.                 final String field8 = request.getString("f14");
  98.                 final String field9 = request.getLong("f15").toString();
  99.                 final String field10 = request.getString("f16");
  100.                 final String field11 = request.getString("f17") + request.getString("f18") + request.getString("f19") + request.getString("f20") + request.getString("f21") + request.getString("f22");
  101.  
  102.                 final Context ctx = vertx.getOrCreateContext();
  103.                 TreeSet<String> keys = new TreeSet<String>();
  104.                 keys.add(field3);
  105.                 keys.add(field3 + field1);
  106.                 keys.add(field3 + field2);
  107.                 keys.add(field4);
  108.                 keys.add(field4 + field1);
  109.                 keys.add(field4 + field2);
  110.                 keys.add(field5);
  111.                 keys.add(field5 + field1);
  112.                 keys.add(field5 + field2);
  113.                 keys.add(field6);
  114.                 keys.add(field6 + field1);
  115.                 keys.add(field6 + field2);
  116.                 keys.add(field7);
  117.                 keys.add(field7 + field1);
  118.                 keys.add(field7 + field2);
  119.                 keys.add(field8);
  120.                 keys.add(field8 + field1);
  121.                 keys.add(field8 + field2);
  122.                 keys.add(field9);
  123.                 keys.add(field9 + field1);
  124.                 keys.add(field9 + field2);
  125.                 keys.add(field10);
  126.                 keys.add(field10 + field1);
  127.                 keys.add(field10 + field2);
  128.                 keys.add(field11);
  129.                 keys.add(field11 + field1);
  130.                 keys.add(field11 + field2);
  131.  
  132.                 cache.<String>invokeAll(keys, (entry, args) -> {
  133.                     HashMap<String, HashSet<String>> params = null;
  134.                     String mykey = entry.getKey();
  135.  
  136.                     if(!entry.exists()){
  137.                         params = new HashMap<String, HashSet<String>>();
  138.    
  139.                         params.put("field3", newHashSet(field3));
  140.                         params.put("field4", newHashSet(field4));
  141.                         params.put("field5", newHashSet(field5));
  142.                         params.put("field6", newHashSet(field6));
  143.                         params.put("field7", newHashSet(field7));
  144.                         params.put("field8", newHashSet(field8));
  145.                         params.put("field9", newHashSet(field9));
  146.                         params.put("field10", newHashSet(field10));
  147.                         params.put("field11", newHashSet(field11));
  148.    
  149.                         cache.put(mykey, params);
  150.                     } else {
  151.                         params = entry.getValue();
  152.  
  153.                         params.get("field3").add(field3);
  154.                         params.get("field4").add(field4);
  155.                         params.get("field5").add(field5);
  156.                         params.get("field6").add(field6);
  157.                         params.get("field7").add(field7);
  158.                         params.get("field8").add(field8);
  159.                         params.get("field9").add(field9);
  160.                         params.get("field10").add(field10);
  161.                         params.get("field11").add(field11);
  162.    
  163.                         entry.setValue(params);
  164.                     }
  165.  
  166.                     String counts =
  167.                             + params.get("field3").size() + ","
  168.                             + params.get("field4").size() + ","
  169.                             + params.get("field5").size() + ","
  170.                             + params.get("field6").size() + ","
  171.                             + params.get("field7").size() + ","
  172.                             + params.get("field8").size() + ","
  173.                             + params.get("field9").size() + ","
  174.                             + params.get("field10").size() + ","
  175.                             + params.get("field11").size()
  176.                             ;                  
  177.  
  178.                     return counts;
  179.                 });
  180.                 cache.<Map<String,CacheInvokeResult<String>>>future().listen(ia -> {
  181.                    
  182.                     final JsonArray countss = new JsonArray();
  183.                    
  184.                     ia.get().forEach((k,v)->{
  185.                         countss.add(new JsonObject().put(k, v.get()));
  186.                     });
  187.  
  188.                     ctx.runOnContext(ch -> {
  189.                         rh.reply(new JsonObject().put("counts", countss)); 
  190.                     });                    
  191.                    
  192.                 });
  193.             } catch(Exception ex) {
  194.                 ex.printStackTrace();
  195.  
  196.                 rh.reply(ex.toString());
  197.             }          
  198.         });
  199.     }
  200.    
  201.     public void stop() {
  202.         synchronized (cacheStopped) {
  203.            
  204.             if(cacheStopped.compareAndSet(false, true))
  205.             {
  206.                 System.out.println("Stopping cache...");
  207.                 Ignition.stop(true);
  208.            
  209.             }
  210.         }
  211.     }
  212.    
  213.     public static HashSet<String> newHashSet(String... strings) {
  214.         HashSet<String> set = new HashSet<String>();
  215.  
  216.         for (String s : strings) {
  217.             set.add(s);
  218.         }
  219.         return set;
  220.     }    
  221. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement