Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // java server
- package com.onstar.ccg.ebike;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import org.zeromq.ZFrame;
- import org.zeromq.ZMQ;
- import org.zeromq.ZMsg;
- /**
- * You must pass the following JVM arg:
- * <tt>-Djava.library.path=/usr/local/lib</tt>
- * or you'll have to specify the library environment variable for you platform:
- * <tt>DYLD_LIBRARY_PATH</tt> for Mac or <tt>LD_LIBRARY_PATH</tt> for Linux
- */
- public class BrokerRunner {
- private static String FRONTEND_URL = "tcp://*:"+5555;
- // can't bind to * here for some reason
- private static String BACKEND_URL = "tcp://127.0.0.1:"+5556;
- public static void main(String[] args) {
- System.out.println("ZeroMQ Version: "+ZMQ.getVersionString());
- ExecutorService executor = Executors.newFixedThreadPool(2);
- Thread brokerThread = new Thread(new Broker());
- Thread workerThread = new Thread(new Worker());
- brokerThread.setDaemon(true);
- workerThread.setDaemon(true);
- executor.execute(brokerThread);
- executor.execute(workerThread);
- executor.shutdown();
- while (!executor.isTerminated()) {
- }
- System.out.println("finished");
- }
- static class Broker implements Runnable {
- @Override
- public void run() {
- ZMQ.Context context = ZMQ.context(1);
- ZMQ.Socket frontend = context.socket(ZMQ.ROUTER);
- ZMQ.Socket backend = context.socket(ZMQ.ROUTER);
- frontend.bind(FRONTEND_URL);
- backend.bind(BACKEND_URL);
- while(!Thread.currentThread().isInterrupted()) {
- ZMQ.Poller items = context.poller();
- items.register(frontend, ZMQ.Poller.POLLIN);
- items.register(backend, ZMQ.Poller.POLLIN);
- if (items.poll() == -1) {
- System.out.println("interrupted");
- break;
- }
- // poll frontend
- if (items.pollin(0)) {
- System.out.println("waiting to receive on frontend...");
- ZMsg msg = ZMsg.recvMsg(frontend);
- System.out.println("Got a message on frontend...");
- if (msg == null)
- break; // Interrupted
- ZFrame address = msg.pop();
- System.out.println("\tpopped address frame: "+address);
- address.destroy();
- System.out.println("\tpushed 'W' address frame and sending to backend..");
- msg.addFirst(new ZFrame("W"));
- msg.send(backend);
- }
- // poll backend
- if (items.pollin(1)) {
- System.out.println("waiting to receive on backend...");
- ZMsg msg = ZMsg.recvMsg(backend);
- System.out.println("Got a message on backend...");
- if (msg == null)
- break; // Interruped
- ZFrame address = msg.pop();
- System.out.println("\tpopped address frame: "+address);
- address.destroy();
- System.out.println("\tpushed 'C' address frame and sending to frontend..");
- msg.addFirst(new ZFrame("C"));
- msg.send(frontend);
- }
- }
- System.out.println("terminating broker context");
- context.term();
- }
- }
- static class Worker implements Runnable {
- @Override
- public void run() {
- ZMQ.Context context = ZMQ.context(1);
- ZMQ.Socket worker = context.socket(ZMQ.DEALER);
- worker.setIdentity("W".getBytes());
- worker.connect(BACKEND_URL);
- while (!Thread.currentThread().isInterrupted()) {
- ZMsg msg = ZMsg.recvMsg(worker);
- System.out.println(String.format("received message on worker:%s", msg));
- msg.send(worker);
- }
- System.out.println("terminating worker context");
- context.term();
- }
- }
- }
- // C client
- //////////////////////////////////////////////////////
- /// Initializes ZeroMQ context and connects to server url
- /// @param ip the IP address of the server
- /// @param port the port number of the server
- /// @return 0 for success; non-zero otherwise
- int comm_new(const char* ip, const char* port) {
- snprintf(server, 40, "%s%s%s%s", "tcp://", ip, ":", port);
- // create context
- ctx = zctx_new();
- client = zsocket_new(ctx, ZMQ_REQ);
- int ret = zsocket_connect(client, server);
- if (ret != 0) { dumpError("zsocket_connect", 21); }
- zsocket_set_identity(client, "C");
- return ret;
- }
- //////////////////////////////////////////////////////
- /// Run a synchronous test with server
- /// @param cnt the number of iterations
- /// @param debug whether to print debug statements
- void comm_send_synch(int cnt, bool debug) {
- printf("Synchronous round-trip test...\n");
- int requests;
- int64_t start = zclock_time();
- for (requests = 0; requests < cnt; requests++) {
- int ret = zstr_send (client, "Hello");
- if (ret != 0) {
- if (debug) { dumpError ("zstr_send", 90); }
- } else {
- char *reply = zstr_recv (client);
- if (debug) { printf ("\n\treceived: %s\n", reply); }
- free (reply);
- }
- }
- printf("\n\t%d calls/second\n",
- (1000 * cnt) / (int) (zclock_time() - start));
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement