Advertisement
Guest User

Untitled

a guest
Jul 28th, 2016
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.34 KB | None | 0 0
  1. package com.smadan.chicago;
  2.  
  3. import com.google.common.primitives.Ints;
  4. import com.google.common.primitives.Longs;
  5. import com.google.common.util.concurrent.FutureCallback;
  6. import com.google.common.util.concurrent.Futures;
  7. import com.google.common.util.concurrent.ListenableFuture;
  8. import com.xjeffrose.chicago.client.ChicagoClient;
  9. import com.xjeffrose.chicago.client.ChicagoClientException;
  10. import com.xjeffrose.chicago.client.ChicagoClientTimeoutException;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.Random;
  14. import java.util.concurrent.CountDownLatch;
  15. import java.util.concurrent.ExecutionException;
  16. import java.util.concurrent.ExecutorService;
  17. import java.util.concurrent.Executors;
  18. import java.util.concurrent.TimeUnit;
  19. import java.util.concurrent.TimeoutException;
  20. import java.util.concurrent.atomic.AtomicInteger;
  21. import javax.annotation.Nullable;
  22.  
  23. /**
  24. * Created by root on 6/23/16.
  25. */
  26. public class WriteTest implements Runnable {
  27. private final static String key = "ppfe-test";
  28. ChicagoClient cts;
  29. private final CountDownLatch latch;
  30. private static AtomicInteger success = new AtomicInteger(0);
  31. private static AtomicInteger failure = new AtomicInteger(0);
  32. private static AtomicInteger readSuccess = new AtomicInteger(0);
  33. private static AtomicInteger readFailure = new AtomicInteger(0);
  34. private static AtomicInteger timeouts = new AtomicInteger(0);
  35. private final static List<ListenableFuture<byte[]>> futures = new ArrayList<ListenableFuture<byte[]>>();
  36. private static Long[] keys;
  37. int valCount;
  38.  
  39. public WriteTest(CountDownLatch latch,ChicagoClient cts, int valCount){
  40. this.latch = latch;
  41. this.cts=cts;
  42. this.valCount=valCount;
  43. }
  44.  
  45. @Override
  46. public void run(){
  47. try{
  48. String v = "val" +valCount + "TTE";
  49. byte[] val = v.getBytes();
  50. System.arraycopy(v.getBytes(),0,val,0,v.getBytes().length);
  51. ListenableFuture<List<byte[]>> future = cts.tsWrite(key.getBytes(),val);
  52. future.get(1000,TimeUnit.MILLISECONDS);
  53. Futures.addCallback(future, new FutureCallback<List<byte[]>>() {
  54. @Override
  55. public void onSuccess(@Nullable List<byte[]> bytes) {
  56. long o = Longs.fromByteArray(bytes.get(0));
  57. System.out.println(o);
  58. success.getAndIncrement();
  59. latch.countDown();
  60. }
  61.  
  62. @Override
  63. public void onFailure(Throwable throwable) {
  64. // TODO(JR): Maybe Try again?
  65. throwable.printStackTrace();
  66. failure.getAndIncrement();
  67. latch.countDown();
  68. }
  69. });
  70. } catch (ChicagoClientTimeoutException e){
  71. System.out.println(e.getMessage());
  72. failure.getAndIncrement();
  73. e.printStackTrace();
  74. } catch (ChicagoClientException e) {
  75. System.out.println(e.getMessage());
  76. failure.getAndIncrement();
  77. e.printStackTrace();
  78. } catch (InterruptedException e) {
  79. e.printStackTrace();
  80. } catch (ExecutionException e) {
  81. e.printStackTrace();
  82. } catch (TimeoutException e) {
  83. e.printStackTrace();
  84. latch.countDown();
  85. } finally {
  86.  
  87. }
  88. }
  89.  
  90. public static void main(String[] args) throws Exception {
  91.  
  92. final int loop = Integer.parseInt(args[0]);
  93. final int workerSize = Integer.parseInt(args[1]);
  94. final int clients = Integer.parseInt(args[2]);
  95. ExecutorService executor = Executors.newFixedThreadPool(workerSize);
  96. CountDownLatch latch = new CountDownLatch(loop);
  97. ChicagoClient[] ctsa = new ChicagoClient[clients];
  98. keys = new Long[loop];
  99. for(int i =0;i<clients;i++){
  100. //ctsa[i] = new ChicagoClient("10.22.100.183:2181,10.25.180.234:2181,10.22.103.86:2181,10.25.180.247:2181,10.25.69.226:2181",3);
  101. //ctsa[i] = new ChicagoClient("10.24.25.188:2181,10.24.25.189:2181,10.25.145.56:2181,10.24.33.123:2181",3);
  102. //ctsa[i].startAndWaitForNodes(4);
  103. ctsa[i] = new ChicagoClient("10.24.25.188:12000");
  104. Thread.sleep(500);
  105. }
  106.  
  107. System.out.println("######## Statring writes #########");
  108. long startTime = System.currentTimeMillis();
  109. for (int i = 0; i < loop; i++) {
  110. executor.submit(new WriteTest(latch,ctsa[i%clients],i));
  111. }
  112.  
  113. latch.await();
  114.  
  115. System.out.println("Total time taken for "+loop+ " writes ="+ (System.currentTimeMillis() - startTime) + "ms");
  116. System.out.println("Total success :"+ success.get() + " Failures :"+ failure.get() + "Timeouts :"+ timeouts.get());
  117. System.out.println("######## Writes completed #########");
  118. System.out.println();
  119. System.out.println();
  120.  
  121. //System.out.println("######## Verifying the writes #########");
  122. //System.out.println("Randomly reading 5 values");
  123. //Random ran = new Random();
  124. //for(int i =0;i<5;i++){
  125. //
  126. // long curkey = keys[ran.nextInt(loop)];
  127. // try{
  128. // String returnVal = new String(ctsa[(int)curkey%clients].read(key.getBytes(), Longs.toByteArray(curkey)).get());
  129. // //System.out.println(curkey +" :"+returnVal);
  130. // if(returnVal.startsWith("val")){
  131. // readSuccess.getAndIncrement();
  132. // }else{
  133. // readFailure.getAndIncrement();
  134. // }
  135. // }catch(Exception e){
  136. // e.printStackTrace();
  137. // readFailure.getAndIncrement();
  138. // }finally {
  139. // }
  140. //}
  141. //System.out.println("Total read success :"+ readSuccess.get() + " Failures :"+ readFailure.get());
  142. executor.shutdownNow();
  143.  
  144. System.exit(0);
  145. }
  146. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement