Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.xxx.service;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.HashSet;
- import java.util.concurrent.atomic.AtomicBoolean;
- import org.apache.ignite.Ignite;
- import org.apache.ignite.IgniteCache;
- import org.apache.ignite.IgniteCompute;
- import org.apache.ignite.Ignition;
- import org.apache.ignite.cache.CacheAtomicityMode;
- import org.apache.ignite.cache.CacheMode;
- import org.apache.ignite.cache.CachePeekMode;
- import org.apache.ignite.cache.query.SqlQuery;
- import org.apache.ignite.configuration.CacheConfiguration;
- import org.apache.ignite.configuration.IgniteConfiguration;
- import org.apache.ignite.lang.*;
- import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
- import io.vertx.core.AbstractVerticle;
- import io.vertx.core.Future;
- import io.vertx.core.json.JsonObject;
- import com.xxx.utils.StringUtil;
- import com.xxx.model.MyTrx2;
- public class MyService2 extends AbstractVerticle {
- public static final String ADDRESS = "com.xxx.service.my2";
- private static AtomicBoolean cacheCreated = new AtomicBoolean(false);
- private static AtomicBoolean cacheStopped = new AtomicBoolean(false);
- private static Ignite ignite = null;
- private static IgniteCache<Long, MyTrx2> myCache = null;
- public void start(Future<Void> startFuture) {
- // Make sure we init the grid only once.
- synchronized (cacheCreated) {
- if(cacheCreated.compareAndSet(false, true))
- {
- try {
- System.out.println("Creating cache for MyService2 ...");
- IgniteConfiguration igniteCfg = new IgniteConfiguration();
- igniteCfg.setMarshaller(new OptimizedMarshaller(true));
- CacheConfiguration<Long, com.xxx.model.MyTrx2> cacheCfg = new CacheConfiguration<>("mycache");
- cacheCfg.setCacheMode(CacheMode.PARTITIONED);
- cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
- cacheCfg.setBackups(0);
- cacheCfg.setIndexedTypes(Long.class, com.xxx.model.MyTrx2.class);
- ignite = Ignition.start(igniteCfg);
- try {
- myCache = ignite.createCache(cacheCfg);
- } catch(Exception ex) {
- myCache = ignite.cache("mycache");
- }
- //compute = ignite.compute().withAsync();
- vertx.setPeriodic(1000, timer -> {
- System.out.println("Total: " + myCache.size(CachePeekMode.ALL) + " Local: " + myCache.localSize(CachePeekMode.ALL));
- });
- startFuture.complete();
- } catch(Exception ex) {
- startFuture.fail(ex);
- }
- }
- }
- // This is called per HTTP POST request.
- vertx.eventBus().consumer(ADDRESS, handler -> {
- try {
- JsonObject body = new JsonObject(handler.body().toString());
- //System.out.println(body.encodePrettily());
- MyTrx2 trxRequest = new MyTrx2();
- trxRequest.setId(body.getLong("id").longValue());
- trxRequest.setFullName(StringUtil.lower(body.getString("fN") + body.getString("lN")));
- myCache.put(trxRequest.getId(), trxRequest);
- Collection<IgniteFuture<HashSet<MyTrx2>>> futs = new ArrayList<>();
- IgniteCompute compute = ignite.compute().withAsync();
- compute.broadcast(
- (MyTrx2 myReq) -> {
- String sqlStr = "FROM MyTrx2 WHERE fullName = ?";
- SqlQuery sql = new SqlQuery(MyTrx2.class, sqlStr);
- sql.setArgs(trxRequest.getFullName());
- sql.setLocal(true);
- //List<Entry<Long, Transaction>> list =
- HashSet<MyTrx2> list = new HashSet<MyTrx2>(myCache.query(sql.setSql(sqlStr)).getAll());
- return list;
- },
- trxRequest
- );
- futs.add(compute.future());
- futs.forEach(IgniteFuture::get);
- // Do something with response here...
- futs.forEach(fut -> {
- });
- handler.reply("Hello Grid Gain Query!");
- } catch(Exception ex) {
- ex.printStackTrace();
- handler.reply(ex.toString());
- }
- });
- }
- public void stop() {
- synchronized (cacheStopped) {
- if(cacheStopped.compareAndSet(false, true))
- {
- System.out.println("Stopping cahce...");
- Ignition.stop(true);
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement