Advertisement
Guest User

Receiver

a guest
May 22nd, 2014
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.50 KB | None | 0 0
  1. package org.zeromq;
  2.  
  3. import org.junit.Test;
  4.  
  5. import java.util.ArrayList;
  6. import java.util.List;
  7.  
  8. public class Receiver {
  9.  
  10.   static final byte[] PING = "ping".getBytes();
  11.  
  12.   @Test
  13.   public void t0() throws InterruptedException {
  14.     Thread t0 = new Thread(new Runnable() {
  15.       @Override
  16.       public void run() {
  17.         ZMQ.Context ctx = ZMQ.context(1);
  18.         ZMQ.Poller poller = new ZMQ.Poller(1);
  19.         ZMQ.Socket frontend = createFrontend(ctx);
  20.  
  21.         poller.register(frontend, ZMQ.Poller.POLLIN);
  22.  
  23.         long reqPerSec = 0;
  24.         long s = System.currentTimeMillis();
  25.         for (; ; ) {
  26.           poller.poll(1000);
  27.           // if no incoming traffic on frontend -- send a PING.
  28.           if (!poller.pollin(0)) {
  29.             sendPing(frontend);
  30.             System.out.println("Sent PING");
  31.           }
  32.           // if got incoming traffic -- recv it and send back; incr counter.
  33.           else {
  34.             // .recv()
  35.             List<byte[]> msg = recv(frontend);
  36.  
  37.             // .send()
  38.             send(frontend, msg);
  39.  
  40.             reqPerSec++;
  41.             // render request-per-second ...
  42.             if ((System.currentTimeMillis() - s) / 1000 >= 1) {
  43.               System.out.println("receiver: request-reply/sec ~ " + reqPerSec);
  44.               s = System.currentTimeMillis();
  45.               reqPerSec = 0;
  46.             }
  47.           }
  48.         }
  49.       }
  50.     });
  51.     t0.start();
  52.     t0.join();
  53.   }
  54.  
  55.   ZMQ.Socket createFrontend(ZMQ.Context ctx) {
  56.     ZMQ.Socket frontend = ctx.socket(ZMQ.DEALER);
  57.     frontend.setSendTimeOut(-1);
  58.     frontend.connect("tcp://localhost:5566");
  59.     return frontend;
  60.   }
  61.  
  62.   void send(ZMQ.Socket frontend, List<byte[]> msg) {
  63.     List<byte[]> msgCopy = new ArrayList<byte[]>();
  64.     msgCopy.add(new byte[0]); // empty frame for DEALER
  65.     msgCopy.add(msg.get(1/*PAYLOAD*/));
  66.     int i = 0;
  67.     for (byte[] frame : msgCopy) {
  68.       frontend.send(frame, ++i < msgCopy.size() ? ZMQ.SNDMORE : 0);
  69.     }
  70.   }
  71.  
  72.   List<byte[]> recv(ZMQ.Socket frontend) {
  73.     List<byte[]> msg = new ArrayList<byte[]>();
  74.     for (; ; ) {
  75.       msg.add(frontend.recv(0));
  76.       if (!frontend.hasReceiveMore()) {
  77.         break;
  78.       }
  79.     }
  80.     return msg;
  81.   }
  82.  
  83.   void sendPing(ZMQ.Socket frontend) {
  84.     List<byte[]> msg = new ArrayList<byte[]>();
  85.     msg.add(new byte[0]); // empty frame for DEALER
  86.     msg.add(PING);
  87.     int i = 0;
  88.     for (byte[] frame : msg) {
  89.       frontend.send(frame, ++i < msg.size() ? ZMQ.SNDMORE : 0);
  90.     }
  91.   }
  92. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement