Advertisement
Guest User

Untitled

a guest
Dec 5th, 2019
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.96 KB | None | 0 0
  1. package com.wti.osmosis.ologger;
  2.  
  3. import java.nio.channels.SelectableChannel;
  4. import java.util.Map;
  5. import java.util.concurrent.ConcurrentHashMap;
  6.  
  7. import org.apache.log4j.Logger;
  8. import org.testng.Assert;
  9. import org.zeromq.SocketType;
  10. import org.zeromq.ZContext;
  11. import org.zeromq.ZMQ;
  12. import org.zeromq.ZPoller;
  13.  
  14. import com.wti.osmosis.ocommon.message.BaseMessage;
  15. import com.wti.osmosis.ocommon.util.log.LoggerFactory;
  16.  
  17. public class TestRouter
  18.     implements ZPoller.EventsHandler
  19. {
  20.     private static Logger logger = LoggerFactory.getLogger( TestRouter.class );
  21.    
  22.     ZContext context;
  23.     ZMQ.Socket socket;
  24.     Map<String,BaseMessage> pendingAck = new ConcurrentHashMap<>();
  25.     Map<String,BaseMessage> pendingReply = new ConcurrentHashMap<>();
  26.  
  27.     public TestRouter(String address, ZContext context, ZPoller poller)
  28.     {
  29.         this.context = context;
  30.         socket = context.createSocket( SocketType.ROUTER );
  31.  
  32.         socket.setIdentity( "7bb14583-4258-4192-bc1a-7b7a0f5443cc".getBytes() );
  33.         socket.connect( address );
  34.         poller.register(socket, this, ZPoller.IN);
  35.     }
  36.  
  37.     @Override
  38.     public boolean events(ZMQ.Socket socket, int events)
  39.     {
  40.         BaseMessage msg = BaseMessage.recvMsg( socket );
  41.         switch( msg.getType() )
  42.         {
  43.             case ACK:
  44.                 Assert.assertNotNull( pendingAck.remove( msg.getIdentifier() ) );
  45.                 logger.info( "ACK:   " + msg.toString( true ) );
  46.                 break;
  47.  
  48.             case REPLY:
  49.                 Assert.assertNotNull( pendingReply.remove( msg.getIdentifier() ) );
  50.                 logger.info( "REPLY: " + msg.toString( true ) );
  51.                 break;
  52.  
  53.             default:
  54.                 logger.info( "Unexpected Message: " + msg.toString(true) );
  55.         }
  56.         return false;
  57.     }
  58.  
  59.     @Override
  60.     public boolean events(SelectableChannel channel, int events)
  61.     {
  62.         throw new UnsupportedOperationException();
  63.     }
  64. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement