Advertisement
Guest User

Untitled

a guest
May 24th, 2017
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 5.90 KB | None | 0 0
  1. package org.cometd.server.util;
  2.  
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.locks.Condition;
  7. import java.util.concurrent.locks.ReadWriteLock;
  8. import java.util.concurrent.locks.ReentrantReadWriteLock;
  9.  
  10. import org.cometd.bayeux.server.BayeuxServer;
  11. import org.cometd.bayeux.server.ServerChannel;
  12. import org.cometd.bayeux.server.ServerSession;
  13. import org.cometd.bayeux.server.ServerMessage.Mutable;
  14. import org.eclipse.jetty.util.log.Log;
  15.  
  16.  
  17. /* ------------------------------------------------------------ */
  18. /** Channel Initialised Listener.
  19.  * This listener implements a latch on a channel name, which can be
  20.  * waited on to ensure that all prior channel listeners have been
  21.  * executed before continuing.
  22.  */
  23. public class ChannelInitializedListener implements BayeuxServer.ChannelListener
  24. {
  25.     final Map<String,Integer> _channels = new HashMap<String,Integer>();
  26.     ReadWriteLock _lock = new ReentrantReadWriteLock();
  27.     Condition _modified = _lock.writeLock().newCondition();
  28.  
  29.     /* ------------------------------------------------------------ */
  30.     @Override
  31.     public void channelAdded(ServerChannel channel)
  32.     {
  33.         try
  34.         {
  35.             _lock.writeLock().lock();
  36.             Integer count= _channels.get(channel.getId());
  37.             if (count==null)
  38.                 _channels.put(channel.getId(),1);
  39.             else if (++count==0)
  40.                 _channels.remove(channel.getId());
  41.             else
  42.                 _channels.put(channel.getId(),count);
  43.  
  44.             _modified.signalAll();
  45.         }
  46.         finally
  47.         {
  48.             _lock.writeLock().unlock();
  49.         }
  50.     }
  51.    
  52.     /* ------------------------------------------------------------ */
  53.     @Override
  54.     public void channelRemoved(String channelId)
  55.     {
  56.         try
  57.         {
  58.             _lock.writeLock().lock();
  59.             Integer count= _channels.get(channelId);
  60.             if (count==null)
  61.                 _channels.put(channelId,-11);
  62.             else if (--count==0)
  63.                 _channels.remove(channelId);
  64.             else
  65.                 _channels.put(channelId,count);
  66.  
  67.             _modified.signalAll();
  68.         }
  69.         finally
  70.         {
  71.             _lock.writeLock().unlock();
  72.         }
  73.     }
  74.    
  75.    
  76.     /* ------------------------------------------------------------ */
  77.     public boolean waitForChannel(String channelId,long waitFor,TimeUnit units)
  78.     throws InterruptedException
  79.     {
  80.         try
  81.         {
  82.             _lock.readLock().lock();
  83.             Integer count = _channels.get(channelId);
  84.             if (count!=null && count>0)
  85.                 return true;
  86.         }
  87.         finally
  88.         {
  89.             _lock.readLock().unlock();
  90.         }
  91.        
  92.         // have to wait, so need the write lock
  93.         try
  94.         {
  95.             long nanos = units.toNanos(waitFor);
  96.             _lock.writeLock().lock();
  97.             while(true)
  98.             {
  99.                 Integer count = _channels.get(channelId);
  100.                 if (count!=null && count>0)
  101.                     return true;
  102.            
  103.                 if (nanos>0)
  104.                     nanos=_modified.awaitNanos(nanos);
  105.                 else
  106.                     return false;
  107.             }
  108.         }
  109.         finally
  110.         {
  111.             _lock.writeLock().unlock();
  112.         }
  113.     }
  114.    
  115.    
  116.     /* ------------------------------------------------------------ */
  117.     /** Get an entangled subscription listeners.
  118.      * Subscription listeners added after this listener will be guaranteed
  119.      * that all ChannelListeners before the ChannelInitializedListener will have been called.
  120.      * @param waitFor
  121.      * @param units
  122.      * @return
  123.      */
  124.     ServerChannel.SubscriptionListener getEntangledSubscriptionListener(final long waitFor, final TimeUnit units)
  125.     {
  126.         return new ServerChannel.SubscriptionListener()
  127.         {
  128.             @Override
  129.             public void unsubscribed(ServerSession session, ServerChannel channel)
  130.             {
  131.                 try
  132.                 {
  133.                     if (waitForChannel(channel.getId(),waitFor,units))
  134.                         throw new IllegalStateException();
  135.                 }
  136.                 catch(InterruptedException e)
  137.                 {
  138.                     throw new IllegalStateException(e);
  139.                 }
  140.             }
  141.            
  142.             @Override
  143.             public void subscribed(ServerSession session, ServerChannel channel)
  144.             {
  145.                 try
  146.                 {
  147.                     if (waitForChannel(channel.getId(),waitFor,units))
  148.                         throw new IllegalStateException();
  149.                 }
  150.                 catch(InterruptedException e)
  151.                 {
  152.                     throw new IllegalStateException(e);
  153.                 }
  154.             }
  155.         };
  156.     }
  157.    
  158.  
  159.     /* ------------------------------------------------------------ */
  160.     /** Get an entangled message listeners.
  161.      * Message listeners added after this listener will be guaranteed
  162.      * that all ChannelListeners before the ChannelInitializedListener will
  163.      * have been called.
  164.      * @param waitFor
  165.      * @param units
  166.      * @return
  167.      */
  168.     ServerChannel.MessageListener getEntangledMessageListener(final long waitFor, final TimeUnit units)
  169.     {
  170.         return new ServerChannel.MessageListener()
  171.         {
  172.             @Override
  173.             public boolean onMessage(ServerSession from, ServerChannel channel, Mutable message)
  174.             {
  175.                 try
  176.                 {
  177.                     if (waitForChannel(channel.getId(),waitFor,units))
  178.                         return true;
  179.                 }
  180.                 catch(InterruptedException e)
  181.                 {
  182.                     Log.ignore(e);
  183.                 }
  184.                 return false;
  185.             }
  186.            
  187.         };
  188.     }
  189. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement