Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.zeromq.ZMQ;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicInteger;
- public class RouterDealerExample {
- static final String URI = "tcp://127.0.0.1:55552";
- static final AtomicBoolean finished = new AtomicBoolean(false);
- static final AtomicInteger SENT = new AtomicInteger(0);
- static final AtomicInteger RECEIVED = new AtomicInteger(0);
- static AtomicInteger connectedCount = new AtomicInteger(0);
- /**
- * Worker runnable consumes messages until it receives an END
- * message.
- */
- public static class Client implements Runnable {
- public final String name;
- Client(String name) {
- this.name = name;
- }
- public void run() {
- ZMQ.Context context = ZMQ.context(1);
- ZMQ.Socket socket = context.socket(ZMQ.DEALER);
- socket.setIdentity(name.getBytes());
- socket.connect(URI);
- // Option 2. Send a message through to connect fully.
- // socket.send("".getBytes(), 0);
- connectedCount.incrementAndGet();
- System.out.println("Connected-count=" + connectedCount.get());
- while (!finished.get()) {
- byte[] data = socket.recv(0);
- RECEIVED.incrementAndGet();
- String msg = new String(data);
- System.out.println(String.format("[" + RECEIVED.get() + "/" + SENT.get() + "]Worker %s received '%s'", name, msg));
- }
- socket.close();
- context.term();
- }
- }
- private static void initClients() {
- for (int i = 0; i < clientCount; i++) {
- Thread workerA = new Thread(new Client("client-" + i));
- workerA.start();
- }
- }
- static int clientCount = 50;
- private static void initServer() throws Exception {
- new Thread() {
- @Override
- public void run() {
- ZMQ.Context context = null;
- ZMQ.Socket socket = null;
- try {
- context = ZMQ.context(1);
- socket = context.socket(ZMQ.ROUTER);
- socket.bind(URI);
- // Option 2.
- // socket.send("".getBytes(), 0);
- // Option 1. this will work usually, to ensure endpoints are connected.
- // Thread.sleep(100);
- while (connectedCount.get() < clientCount) {
- Thread.sleep(10);
- System.out.println("Waiting for connections ...");
- }
- // Send 10 tasks, scattered to A twice as often as B.
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < clientCount; j++) {
- SENT.incrementAndGet();
- socket.send(("client-" + j).getBytes(), ZMQ.SNDMORE);
- socket.send(("Workload[" + i + "/total-sent=" + SENT.get() + "] for client-" + j).getBytes(), 0);
- }
- }
- Thread.sleep(1000); // let messages send and arrive.
- System.out.println("Total counts [sent=" + SENT.get() + ", received=" + RECEIVED.get() + "]");
- } catch (Exception e) {
- System.out.println("Failed");
- e.printStackTrace(System.err);
- } finally {
- socket.close();
- context.term();
- }
- }
- }.start();
- }
- public static void main(String[] args) throws Exception {
- initServer();
- initClients();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement