Advertisement
Guest User

router-dealer example missing messages.

a guest
Sep 25th, 2012
171
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.75 KB | None | 0 0
  1. import org.zeromq.ZMQ;
  2. import java.util.concurrent.atomic.AtomicBoolean;
  3. import java.util.concurrent.atomic.AtomicInteger;
  4.  
  5. public class RouterDealerExample {
  6.  
  7. static final String URI = "tcp://127.0.0.1:55552";
  8.  
  9. static final AtomicBoolean finished = new AtomicBoolean(false);
  10.  
  11. static final AtomicInteger SENT = new AtomicInteger(0);
  12. static final AtomicInteger RECEIVED = new AtomicInteger(0);
  13.  
  14. static AtomicInteger connectedCount = new AtomicInteger(0);
  15.  
  16. /**
  17. * Worker runnable consumes messages until it receives an END
  18. * message.
  19. */
  20. public static class Client implements Runnable {
  21. public final String name;
  22.  
  23. Client(String name) {
  24. this.name = name;
  25. }
  26.  
  27. public void run() {
  28. ZMQ.Context context = ZMQ.context(1);
  29. ZMQ.Socket socket = context.socket(ZMQ.DEALER);
  30. socket.setIdentity(name.getBytes());
  31. socket.connect(URI);
  32.  
  33. // Option 2. Send a message through to connect fully.
  34. // socket.send("".getBytes(), 0);
  35.  
  36. connectedCount.incrementAndGet();
  37. System.out.println("Connected-count=" + connectedCount.get());
  38.  
  39. while (!finished.get()) {
  40. byte[] data = socket.recv(0);
  41. RECEIVED.incrementAndGet();
  42. String msg = new String(data);
  43. System.out.println(String.format("[" + RECEIVED.get() + "/" + SENT.get() + "]Worker %s received '%s'", name, msg));
  44. }
  45.  
  46. socket.close();
  47. context.term();
  48. }
  49. }
  50.  
  51. private static void initClients() {
  52. for (int i = 0; i < clientCount; i++) {
  53. Thread workerA = new Thread(new Client("client-" + i));
  54. workerA.start();
  55. }
  56. }
  57.  
  58. static int clientCount = 50;
  59.  
  60. private static void initServer() throws Exception {
  61. new Thread() {
  62. @Override
  63. public void run() {
  64. ZMQ.Context context = null;
  65. ZMQ.Socket socket = null;
  66. try {
  67. context = ZMQ.context(1);
  68. socket = context.socket(ZMQ.ROUTER);
  69. socket.bind(URI);
  70.  
  71. // Option 2.
  72. // socket.send("".getBytes(), 0);
  73.  
  74. // Option 1. this will work usually, to ensure endpoints are connected.
  75. // Thread.sleep(100);
  76.  
  77. while (connectedCount.get() < clientCount) {
  78. Thread.sleep(10);
  79. System.out.println("Waiting for connections ...");
  80. }
  81.  
  82. // Send 10 tasks, scattered to A twice as often as B.
  83. for (int i = 0; i < 10; i++) {
  84. for (int j = 0; j < clientCount; j++) {
  85. SENT.incrementAndGet();
  86. socket.send(("client-" + j).getBytes(), ZMQ.SNDMORE);
  87. socket.send(("Workload[" + i + "/total-sent=" + SENT.get() + "] for client-" + j).getBytes(), 0);
  88. }
  89. }
  90.  
  91. Thread.sleep(1000); // let messages send and arrive.
  92. System.out.println("Total counts [sent=" + SENT.get() + ", received=" + RECEIVED.get() + "]");
  93. } catch (Exception e) {
  94. System.out.println("Failed");
  95. e.printStackTrace(System.err);
  96. } finally {
  97. socket.close();
  98. context.term();
  99. }
  100. }
  101. }.start();
  102. }
  103.  
  104. public static void main(String[] args) throws Exception {
  105. initServer();
  106. initClients();
  107. }
  108. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement