Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.wti.osmosis.ologger;
- import java.nio.channels.SelectableChannel;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import org.apache.log4j.Logger;
- import org.testng.Assert;
- import org.zeromq.SocketType;
- import org.zeromq.ZContext;
- import org.zeromq.ZMQ;
- import org.zeromq.ZPoller;
- import com.wti.osmosis.ocommon.message.BaseMessage;
- import com.wti.osmosis.ocommon.util.log.LoggerFactory;
- public class TestRouter
- implements ZPoller.EventsHandler
- {
- private static Logger logger = LoggerFactory.getLogger( TestRouter.class );
- ZContext context;
- ZMQ.Socket socket;
- Map<String,BaseMessage> pendingAck = new ConcurrentHashMap<>();
- Map<String,BaseMessage> pendingReply = new ConcurrentHashMap<>();
- public TestRouter(String address, ZContext context, ZPoller poller)
- {
- this.context = context;
- socket = context.createSocket( SocketType.ROUTER );
- socket.setIdentity( "7bb14583-4258-4192-bc1a-7b7a0f5443cc".getBytes() );
- socket.connect( address );
- poller.register(socket, this, ZPoller.IN);
- }
- @Override
- public boolean events(ZMQ.Socket socket, int events)
- {
- BaseMessage msg = BaseMessage.recvMsg( socket );
- switch( msg.getType() )
- {
- case ACK:
- Assert.assertNotNull( pendingAck.remove( msg.getIdentifier() ) );
- logger.info( "ACK: " + msg.toString( true ) );
- break;
- case REPLY:
- Assert.assertNotNull( pendingReply.remove( msg.getIdentifier() ) );
- logger.info( "REPLY: " + msg.toString( true ) );
- break;
- default:
- logger.info( "Unexpected Message: " + msg.toString(true) );
- }
- return false;
- }
- @Override
- public boolean events(SelectableChannel channel, int events)
- {
- throw new UnsupportedOperationException();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement