Advertisement
Guest User

Ignite Service

a guest
Apr 14th, 2015
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.18 KB | None | 0 0
  1. package com.xxx.service;
  2.  
  3. import java.util.ArrayList;
  4. import java.util.Collection;
  5. import java.util.HashSet;
  6. import java.util.concurrent.atomic.AtomicBoolean;
  7.  
  8. import org.apache.ignite.Ignite;
  9. import org.apache.ignite.IgniteCache;
  10. import org.apache.ignite.IgniteCompute;
  11. import org.apache.ignite.Ignition;
  12. import org.apache.ignite.cache.CacheAtomicityMode;
  13. import org.apache.ignite.cache.CacheMode;
  14. import org.apache.ignite.cache.CachePeekMode;
  15. import org.apache.ignite.cache.query.SqlQuery;
  16. import org.apache.ignite.configuration.CacheConfiguration;
  17. import org.apache.ignite.configuration.IgniteConfiguration;
  18. import org.apache.ignite.lang.*;
  19. import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
  20.  
  21. import io.vertx.core.AbstractVerticle;
  22. import io.vertx.core.Future;
  23. import io.vertx.core.json.JsonObject;
  24.  
  25. import com.xxx.utils.StringUtil;
  26. import com.xxx.model.MyTrx2;
  27.  
  28. public class MyService2 extends AbstractVerticle {
  29.  
  30.     public static final String ADDRESS = "com.xxx.service.my2";
  31.    
  32.     private static AtomicBoolean cacheCreated = new AtomicBoolean(false);  
  33.     private static AtomicBoolean cacheStopped = new AtomicBoolean(false);  
  34.     private static Ignite ignite = null;
  35.     private static IgniteCache<Long, MyTrx2> myCache = null;
  36.  
  37.     public void start(Future<Void> startFuture) {
  38.        
  39.         // Make sure we init the grid only once.
  40.         synchronized (cacheCreated) {
  41.             if(cacheCreated.compareAndSet(false, true))
  42.             {
  43.                         try {
  44.                             System.out.println("Creating cache for MyService2 ...");
  45.  
  46.                             IgniteConfiguration igniteCfg = new IgniteConfiguration();
  47.                             igniteCfg.setMarshaller(new OptimizedMarshaller(true));
  48.                            
  49.                             CacheConfiguration<Long, com.xxx.model.MyTrx2> cacheCfg = new CacheConfiguration<>("mycache");
  50.                             cacheCfg.setCacheMode(CacheMode.PARTITIONED);
  51.                             cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
  52.                             cacheCfg.setBackups(0);
  53.                             cacheCfg.setIndexedTypes(Long.class, com.xxx.model.MyTrx2.class);
  54.                            
  55.                            
  56.                             ignite = Ignition.start(igniteCfg);
  57.                            
  58.                             try {
  59.                                 myCache = ignite.createCache(cacheCfg);
  60.                             } catch(Exception ex) {
  61.                                 myCache = ignite.cache("mycache");
  62.                             }
  63.                             //compute = ignite.compute().withAsync();
  64.  
  65.                             vertx.setPeriodic(1000, timer -> {
  66.                                 System.out.println("Total: " + myCache.size(CachePeekMode.ALL) + " Local: " + myCache.localSize(CachePeekMode.ALL));
  67.                             });
  68.  
  69.                             startFuture.complete();
  70.                         } catch(Exception ex) {
  71.                             startFuture.fail(ex);
  72.                         }          
  73.             }
  74.         }
  75.        
  76.        
  77.         // This is called per HTTP POST request.
  78.         vertx.eventBus().consumer(ADDRESS, handler -> {
  79.             try {
  80.                 JsonObject body = new JsonObject(handler.body().toString());
  81.                 //System.out.println(body.encodePrettily());
  82.                
  83.                
  84.                 MyTrx2 trxRequest = new MyTrx2();
  85.                 trxRequest.setId(body.getLong("id").longValue());
  86.                 trxRequest.setFullName(StringUtil.lower(body.getString("fN") + body.getString("lN")));
  87.  
  88.                 myCache.put(trxRequest.getId(), trxRequest);
  89.  
  90.                 Collection<IgniteFuture<HashSet<MyTrx2>>> futs = new ArrayList<>();
  91.                
  92.                 IgniteCompute compute = ignite.compute().withAsync();
  93.  
  94.                 compute.broadcast(
  95.                         (MyTrx2 myReq) -> {
  96.                             String sqlStr = "FROM MyTrx2 WHERE fullName = ?";
  97.                             SqlQuery sql = new SqlQuery(MyTrx2.class, sqlStr);
  98.                             sql.setArgs(trxRequest.getFullName());
  99.                             sql.setLocal(true);
  100.  
  101.                             //List<Entry<Long, Transaction>> list =
  102.                            
  103.                             HashSet<MyTrx2> list = new HashSet<MyTrx2>(myCache.query(sql.setSql(sqlStr)).getAll());
  104.                             return list;
  105.                         },
  106.                         trxRequest
  107.                     );
  108.                 futs.add(compute.future());            
  109.  
  110.                 futs.forEach(IgniteFuture::get);
  111.  
  112.                 // Do something with response here...
  113.                 futs.forEach(fut -> {                              
  114.                 });
  115.  
  116.                 handler.reply("Hello Grid Gain Query!");
  117.                
  118.             } catch(Exception ex) {
  119.                 ex.printStackTrace();
  120.  
  121.                 handler.reply(ex.toString());
  122.             }          
  123.         });
  124.     }
  125.    
  126.     public void stop() {
  127.         synchronized (cacheStopped) {
  128.            
  129.             if(cacheStopped.compareAndSet(false, true))
  130.             {
  131.                 System.out.println("Stopping cahce...");
  132.                 Ignition.stop(true);
  133.            
  134.             }
  135.         }
  136.     }
  137. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement