Advertisement
Guest User

Untitled

a guest
May 24th, 2017
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 4.84 KB | None | 0 0
  1. package org.cometd.server.util;
  2.  
  3. import java.util.concurrent.ConcurrentHashMap;
  4. import java.util.concurrent.ConcurrentMap;
  5. import java.util.concurrent.CountDownLatch;
  6. import java.util.concurrent.TimeUnit;
  7.  
  8. import org.cometd.bayeux.server.BayeuxServer;
  9. import org.cometd.bayeux.server.ServerChannel;
  10. import org.cometd.bayeux.server.ServerSession;
  11. import org.cometd.bayeux.server.ServerMessage.Mutable;
  12. import org.eclipse.jetty.util.log.Log;
  13.  
  14.  
  15. /* ------------------------------------------------------------ */
  16. /** Channel Initialised Listener.
  17.  * This listener implements a latch on a channel name, which can be
  18.  * waited on to ensure that all prior channel listeners have been
  19.  * executed before continuing.
  20.  */
  21. public class ChannelInitializedListener implements BayeuxServer.ChannelListener
  22. {
  23.     final ConcurrentMap<String, CountDownLatch> _latchMap=new ConcurrentHashMap<String, CountDownLatch>();
  24.    
  25.     @Override
  26.     public void channelAdded(ServerChannel channel)
  27.     {
  28.         final String id=channel.getId();
  29.         CountDownLatch latch = _latchMap.get(id);
  30.         if (latch==null)
  31.         {
  32.             CountDownLatch done = new CountDownLatch(0);
  33.             latch = _latchMap.putIfAbsent(id,done);
  34.             if (latch!=null)
  35.                 latch.countDown();
  36.         }
  37.         else
  38.             latch.countDown();
  39.     }
  40.  
  41.     @Override
  42.     public void channelRemoved(String channelId)
  43.     {
  44.         // hmmmm
  45.     }
  46.  
  47.     public void waitForChannel(String channelId)
  48.         throws InterruptedException
  49.     {
  50.         CountDownLatch latch = _latchMap.get(channelId);
  51.         if (latch==null)
  52.         {
  53.             CountDownLatch proposed = new CountDownLatch(1);
  54.             latch = _latchMap.putIfAbsent(channelId,proposed);
  55.             if (latch==null)
  56.                 latch=proposed;
  57.         }
  58.        
  59.         latch.await();
  60.     }
  61.  
  62.     public boolean waitForChannel(String channelId,long waitFor,TimeUnit units)
  63.     throws InterruptedException
  64.     {
  65.         CountDownLatch latch = _latchMap.get(channelId);
  66.         if (latch==null)
  67.         {
  68.             CountDownLatch proposed = new CountDownLatch(1);
  69.             latch = _latchMap.putIfAbsent(channelId,proposed);
  70.             if (latch==null)
  71.                 latch=proposed;
  72.         }
  73.  
  74.         return latch.await(waitFor,units);
  75.     }
  76.    
  77.     /* ------------------------------------------------------------ */
  78.     /** Get an entangled subscription listeners.
  79.      * Subscription listeners added after this listener will be guaranteed
  80.      * that all ChannelListeners before the ChannelInitializedListener will have been called.
  81.      * @param waitFor
  82.      * @param units
  83.      * @return
  84.      */
  85.     ServerChannel.SubscriptionListener getEntangledSubscriptionListener(final long waitFor, final TimeUnit units)
  86.     {
  87.         return new ServerChannel.SubscriptionListener()
  88.         {
  89.             @Override
  90.             public void unsubscribed(ServerSession session, ServerChannel channel)
  91.             {
  92.                 try
  93.                 {
  94.                     if (waitForChannel(channel.getId(),waitFor,units))
  95.                         throw new IllegalStateException();
  96.                 }
  97.                 catch(InterruptedException e)
  98.                 {
  99.                     throw new IllegalStateException(e);
  100.                 }
  101.             }
  102.            
  103.             @Override
  104.             public void subscribed(ServerSession session, ServerChannel channel)
  105.             {
  106.                 try
  107.                 {
  108.                     if (waitForChannel(channel.getId(),waitFor,units))
  109.                         throw new IllegalStateException();
  110.                 }
  111.                 catch(InterruptedException e)
  112.                 {
  113.                     throw new IllegalStateException(e);
  114.                 }
  115.             }
  116.         };
  117.     }
  118.    
  119.  
  120.     /* ------------------------------------------------------------ */
  121.     /** Get an entangled message listeners.
  122.      * Message listeners added after this listener will be guaranteed
  123.      * that all ChannelListeners before the ChannelInitializedListener will
  124.      * have been called.
  125.      * @param waitFor
  126.      * @param units
  127.      * @return
  128.      */
  129.     ServerChannel.MessageListener getEntangledMessageListener(final long waitFor, final TimeUnit units)
  130.     {
  131.         return new ServerChannel.MessageListener()
  132.         {
  133.             @Override
  134.             public boolean onMessage(ServerSession from, ServerChannel channel, Mutable message)
  135.             {
  136.                 try
  137.                 {
  138.                     if (waitForChannel(channel.getId(),waitFor,units))
  139.                         return true;
  140.                 }
  141.                 catch(InterruptedException e)
  142.                 {
  143.                     Log.ignore(e);
  144.                 }
  145.                 return false;
  146.             }
  147.            
  148.         };
  149.     }
  150. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement