Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.zeromq;
- import org.junit.Test;
- import java.util.ArrayList;
- import java.util.List;
- public class Receiver {
- static final byte[] PING = "ping".getBytes();
- @Test
- public void t0() throws InterruptedException {
- Thread t0 = new Thread(new Runnable() {
- @Override
- public void run() {
- ZMQ.Context ctx = ZMQ.context(1);
- ZMQ.Poller poller = new ZMQ.Poller(1);
- ZMQ.Socket frontend = createFrontend(ctx);
- poller.register(frontend, ZMQ.Poller.POLLIN);
- long reqPerSec = 0;
- long s = System.currentTimeMillis();
- for (; ; ) {
- poller.poll(1000);
- // if no incoming traffic on frontend -- send a PING.
- if (!poller.pollin(0)) {
- sendPing(frontend);
- System.out.println("Sent PING");
- }
- // if got incoming traffic -- recv it and send back; incr counter.
- else {
- // .recv()
- List<byte[]> msg = recv(frontend);
- // .send()
- send(frontend, msg);
- reqPerSec++;
- // render request-per-second ...
- if ((System.currentTimeMillis() - s) / 1000 >= 1) {
- System.out.println("receiver: request-reply/sec ~ " + reqPerSec);
- s = System.currentTimeMillis();
- reqPerSec = 0;
- }
- }
- }
- }
- });
- t0.start();
- t0.join();
- }
- ZMQ.Socket createFrontend(ZMQ.Context ctx) {
- ZMQ.Socket frontend = ctx.socket(ZMQ.DEALER);
- frontend.setSendTimeOut(-1);
- frontend.connect("tcp://localhost:5566");
- return frontend;
- }
- void send(ZMQ.Socket frontend, List<byte[]> msg) {
- List<byte[]> msgCopy = new ArrayList<byte[]>();
- msgCopy.add(new byte[0]); // empty frame for DEALER
- msgCopy.add(msg.get(1/*PAYLOAD*/));
- int i = 0;
- for (byte[] frame : msgCopy) {
- frontend.send(frame, ++i < msgCopy.size() ? ZMQ.SNDMORE : 0);
- }
- }
- List<byte[]> recv(ZMQ.Socket frontend) {
- List<byte[]> msg = new ArrayList<byte[]>();
- for (; ; ) {
- msg.add(frontend.recv(0));
- if (!frontend.hasReceiveMore()) {
- break;
- }
- }
- return msg;
- }
- void sendPing(ZMQ.Socket frontend) {
- List<byte[]> msg = new ArrayList<byte[]>();
- msg.add(new byte[0]); // empty frame for DEALER
- msg.add(PING);
- int i = 0;
- for (byte[] frame : msg) {
- frontend.send(frame, ++i < msg.size() ? ZMQ.SNDMORE : 0);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement