Advertisement
Guest User

zeromq java server (DEALER) + workers / c client (REQ)

a guest
Sep 27th, 2012
221
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.77 KB | None | 0 0
  1. // java server
  2. package com.onstar.ccg.ebike;
  3.  
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6.  
  7. import org.zeromq.ZFrame;
  8. import org.zeromq.ZMQ;
  9. import org.zeromq.ZMsg;
  10.  
  11. /**
  12. * You must pass the following JVM arg:
  13. * <tt>-Djava.library.path=/usr/local/lib</tt>
  14. * or you'll have to specify the library environment variable for you platform:
  15. * <tt>DYLD_LIBRARY_PATH</tt> for Mac or <tt>LD_LIBRARY_PATH</tt> for Linux
  16. */
  17. public class BrokerRunner {
  18.  
  19. private static String FRONTEND_URL = "tcp://*:"+5555;
  20. // can't bind to * here for some reason
  21. private static String BACKEND_URL = "tcp://127.0.0.1:"+5556;
  22.  
  23. public static void main(String[] args) {
  24. System.out.println("ZeroMQ Version: "+ZMQ.getVersionString());
  25.  
  26. ExecutorService executor = Executors.newFixedThreadPool(2);
  27. Thread brokerThread = new Thread(new Broker());
  28. Thread workerThread = new Thread(new Worker());
  29.  
  30. brokerThread.setDaemon(true);
  31. workerThread.setDaemon(true);
  32.  
  33. executor.execute(brokerThread);
  34. executor.execute(workerThread);
  35.  
  36. executor.shutdown();
  37. while (!executor.isTerminated()) {
  38.  
  39. }
  40. System.out.println("finished");
  41. }
  42.  
  43. static class Broker implements Runnable {
  44.  
  45. @Override
  46. public void run() {
  47. ZMQ.Context context = ZMQ.context(1);
  48. ZMQ.Socket frontend = context.socket(ZMQ.ROUTER);
  49. ZMQ.Socket backend = context.socket(ZMQ.ROUTER);
  50. frontend.bind(FRONTEND_URL);
  51. backend.bind(BACKEND_URL);
  52.  
  53. while(!Thread.currentThread().isInterrupted()) {
  54. ZMQ.Poller items = context.poller();
  55. items.register(frontend, ZMQ.Poller.POLLIN);
  56. items.register(backend, ZMQ.Poller.POLLIN);
  57.  
  58. if (items.poll() == -1) {
  59. System.out.println("interrupted");
  60. break;
  61. }
  62.  
  63. // poll frontend
  64. if (items.pollin(0)) {
  65. System.out.println("waiting to receive on frontend...");
  66. ZMsg msg = ZMsg.recvMsg(frontend);
  67. System.out.println("Got a message on frontend...");
  68. if (msg == null)
  69. break; // Interrupted
  70. ZFrame address = msg.pop();
  71. System.out.println("\tpopped address frame: "+address);
  72. address.destroy();
  73. System.out.println("\tpushed 'W' address frame and sending to backend..");
  74. msg.addFirst(new ZFrame("W"));
  75. msg.send(backend);
  76. }
  77.  
  78. // poll backend
  79. if (items.pollin(1)) {
  80. System.out.println("waiting to receive on backend...");
  81. ZMsg msg = ZMsg.recvMsg(backend);
  82. System.out.println("Got a message on backend...");
  83. if (msg == null)
  84. break; // Interruped
  85. ZFrame address = msg.pop();
  86. System.out.println("\tpopped address frame: "+address);
  87. address.destroy();
  88. System.out.println("\tpushed 'C' address frame and sending to frontend..");
  89. msg.addFirst(new ZFrame("C"));
  90. msg.send(frontend);
  91. }
  92. }
  93. System.out.println("terminating broker context");
  94. context.term();
  95. }
  96.  
  97. }
  98.  
  99. static class Worker implements Runnable {
  100.  
  101. @Override
  102. public void run() {
  103. ZMQ.Context context = ZMQ.context(1);
  104. ZMQ.Socket worker = context.socket(ZMQ.DEALER);
  105. worker.setIdentity("W".getBytes());
  106. worker.connect(BACKEND_URL);
  107. while (!Thread.currentThread().isInterrupted()) {
  108. ZMsg msg = ZMsg.recvMsg(worker);
  109. System.out.println(String.format("received message on worker:%s", msg));
  110. msg.send(worker);
  111. }
  112. System.out.println("terminating worker context");
  113. context.term();
  114. }
  115.  
  116. }
  117.  
  118. }
  119.  
  120. // C client
  121. //////////////////////////////////////////////////////
  122. /// Initializes ZeroMQ context and connects to server url
  123. /// @param ip the IP address of the server
  124. /// @param port the port number of the server
  125. /// @return 0 for success; non-zero otherwise
  126. int comm_new(const char* ip, const char* port) {
  127. snprintf(server, 40, "%s%s%s%s", "tcp://", ip, ":", port);
  128. // create context
  129. ctx = zctx_new();
  130. client = zsocket_new(ctx, ZMQ_REQ);
  131. int ret = zsocket_connect(client, server);
  132. if (ret != 0) { dumpError("zsocket_connect", 21); }
  133. zsocket_set_identity(client, "C");
  134. return ret;
  135. }
  136.  
  137. //////////////////////////////////////////////////////
  138. /// Run a synchronous test with server
  139. /// @param cnt the number of iterations
  140. /// @param debug whether to print debug statements
  141. void comm_send_synch(int cnt, bool debug) {
  142. printf("Synchronous round-trip test...\n");
  143. int requests;
  144. int64_t start = zclock_time();
  145.  
  146. for (requests = 0; requests < cnt; requests++) {
  147.  
  148. int ret = zstr_send (client, "Hello");
  149.  
  150. if (ret != 0) {
  151. if (debug) { dumpError ("zstr_send", 90); }
  152. } else {
  153.  
  154. char *reply = zstr_recv (client);
  155. if (debug) { printf ("\n\treceived: %s\n", reply); }
  156. free (reply);
  157. }
  158. }
  159. printf("\n\t%d calls/second\n",
  160. (1000 * cnt) / (int) (zclock_time() - start));
  161. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement