SHARE
TWEET

zeromq java server (DEALER) + workers / c client (REQ)

a guest Sep 27th, 2012 161 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. // java server
  2. package com.onstar.ccg.ebike;
  3.  
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6.  
  7. import org.zeromq.ZFrame;
  8. import org.zeromq.ZMQ;
  9. import org.zeromq.ZMsg;
  10.  
  11. /**
  12.  * You must pass the following JVM arg:
  13.  * <tt>-Djava.library.path=/usr/local/lib</tt>
  14.  * or you'll have to specify the library environment variable for you platform:
  15.  * <tt>DYLD_LIBRARY_PATH</tt> for Mac or <tt>LD_LIBRARY_PATH</tt> for Linux
  16.  */
  17. public class BrokerRunner {
  18.        
  19.         private static String FRONTEND_URL = "tcp://*:"+5555;
  20.         // can't bind to * here for some reason
  21.         private static String BACKEND_URL = "tcp://127.0.0.1:"+5556;
  22.  
  23.         public static void main(String[] args) {
  24.                 System.out.println("ZeroMQ Version: "+ZMQ.getVersionString());
  25.                
  26.                 ExecutorService executor = Executors.newFixedThreadPool(2);
  27.                 Thread brokerThread = new Thread(new Broker());
  28.                 Thread workerThread = new Thread(new Worker());
  29.                
  30.                 brokerThread.setDaemon(true);
  31.                 workerThread.setDaemon(true);
  32.                
  33.                 executor.execute(brokerThread);
  34.                 executor.execute(workerThread);
  35.                
  36.                 executor.shutdown();
  37.                 while (!executor.isTerminated()) {
  38.                        
  39.                 }
  40.                 System.out.println("finished");
  41.         }
  42.        
  43.         static class Broker implements Runnable {
  44.  
  45.                 @Override
  46.                 public void run() {
  47.                         ZMQ.Context context = ZMQ.context(1);
  48.                         ZMQ.Socket frontend = context.socket(ZMQ.ROUTER);
  49.                         ZMQ.Socket backend = context.socket(ZMQ.ROUTER);
  50.                         frontend.bind(FRONTEND_URL);
  51.                         backend.bind(BACKEND_URL);
  52.                        
  53.                         while(!Thread.currentThread().isInterrupted()) {
  54.                                 ZMQ.Poller items = context.poller();
  55.                                 items.register(frontend, ZMQ.Poller.POLLIN);
  56.                                 items.register(backend, ZMQ.Poller.POLLIN);
  57.                                
  58.                                 if (items.poll() == -1) {
  59.                                         System.out.println("interrupted");
  60.                                         break;
  61.                                 }
  62.                                
  63.                                 // poll frontend
  64.                                 if (items.pollin(0)) {
  65.                                         System.out.println("waiting to receive on frontend...");
  66.                                         ZMsg msg = ZMsg.recvMsg(frontend);
  67.                                         System.out.println("Got a message on frontend...");
  68.                                         if (msg == null)
  69.                                                 break; // Interrupted
  70.                                         ZFrame address = msg.pop();
  71.                                         System.out.println("\tpopped address frame: "+address);
  72.                                         address.destroy();
  73.                                         System.out.println("\tpushed 'W' address frame and sending to backend..");
  74.                                         msg.addFirst(new ZFrame("W"));
  75.                                         msg.send(backend);
  76.                                 }
  77.                                
  78.                                 // poll backend
  79.                                 if (items.pollin(1)) {
  80.                                         System.out.println("waiting to receive on backend...");
  81.                                         ZMsg msg = ZMsg.recvMsg(backend);
  82.                                         System.out.println("Got a message on backend...");
  83.                                         if (msg == null)
  84.                                                 break; // Interruped
  85.                                         ZFrame address = msg.pop();
  86.                                         System.out.println("\tpopped address frame: "+address);
  87.                                         address.destroy();
  88.                                         System.out.println("\tpushed 'C' address frame and sending to frontend..");
  89.                                         msg.addFirst(new ZFrame("C"));
  90.                                         msg.send(frontend);
  91.                                 }
  92.                         }
  93.                         System.out.println("terminating broker context");
  94.                         context.term();
  95.                 }
  96.                
  97.         }
  98.        
  99.         static class Worker implements Runnable {
  100.  
  101.                 @Override
  102.                 public void run() {
  103.                         ZMQ.Context context = ZMQ.context(1);
  104.                         ZMQ.Socket worker = context.socket(ZMQ.DEALER);
  105.                         worker.setIdentity("W".getBytes());
  106.                         worker.connect(BACKEND_URL);
  107.                         while (!Thread.currentThread().isInterrupted()) {
  108.                                 ZMsg msg = ZMsg.recvMsg(worker);
  109.                                 System.out.println(String.format("received message on worker:%s", msg));
  110.                                 msg.send(worker);
  111.                         }
  112.                         System.out.println("terminating worker context");
  113.                         context.term();
  114.                 }
  115.                
  116.         }
  117.  
  118. }
  119.  
  120. // C client
  121. //////////////////////////////////////////////////////
  122. /// Initializes ZeroMQ context and connects to server url
  123. /// @param ip the IP address of the server
  124. /// @param port the port number of the server
  125. /// @return 0 for success; non-zero otherwise
  126. int comm_new(const char* ip, const char* port) {
  127.   snprintf(server, 40, "%s%s%s%s", "tcp://", ip, ":", port);
  128.   // create context
  129.   ctx = zctx_new();
  130.   client = zsocket_new(ctx, ZMQ_REQ);
  131.   int ret = zsocket_connect(client, server);
  132.   if (ret != 0) { dumpError("zsocket_connect", 21); }
  133.   zsocket_set_identity(client, "C");
  134.   return ret;
  135. }
  136.  
  137. //////////////////////////////////////////////////////
  138. /// Run a synchronous test with server
  139. /// @param cnt the number of iterations
  140. /// @param debug whether to print debug statements
  141. void comm_send_synch(int cnt, bool debug) {
  142.   printf("Synchronous round-trip test...\n");
  143.   int requests;
  144.   int64_t start = zclock_time();
  145.  
  146.   for (requests = 0; requests < cnt; requests++) {
  147.  
  148.     int ret = zstr_send (client, "Hello");
  149.  
  150.     if (ret != 0) {
  151.       if (debug) { dumpError ("zstr_send", 90); }
  152.     } else {
  153.  
  154.       char *reply = zstr_recv (client);
  155.       if (debug) { printf ("\n\treceived: %s\n", reply); }
  156.       free (reply);
  157.     }
  158.   }
  159.   printf("\n\t%d calls/second\n",
  160.     (1000 * cnt) / (int) (zclock_time() - start));
  161. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top