Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.cometd.server.util;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReadWriteLock;
- import java.util.concurrent.locks.ReentrantReadWriteLock;
- import org.cometd.bayeux.server.BayeuxServer;
- import org.cometd.bayeux.server.ServerChannel;
- import org.cometd.bayeux.server.ServerSession;
- import org.cometd.bayeux.server.ServerMessage.Mutable;
- import org.eclipse.jetty.util.log.Log;
- /* ------------------------------------------------------------ */
- /** Channel Initialised Listener.
- * This listener implements a latch on a channel name, which can be
- * waited on to ensure that all prior channel listeners have been
- * executed before continuing.
- */
- public class ChannelInitializedListener implements BayeuxServer.ChannelListener
- {
- final Map<String,Integer> _channels = new HashMap<String,Integer>();
- ReadWriteLock _lock = new ReentrantReadWriteLock();
- Condition _modified = _lock.writeLock().newCondition();
- /* ------------------------------------------------------------ */
- @Override
- public void channelAdded(ServerChannel channel)
- {
- try
- {
- _lock.writeLock().lock();
- Integer count= _channels.get(channel.getId());
- if (count==null)
- _channels.put(channel.getId(),1);
- else if (++count==0)
- _channels.remove(channel.getId());
- else
- _channels.put(channel.getId(),count);
- _modified.signalAll();
- }
- finally
- {
- _lock.writeLock().unlock();
- }
- }
- /* ------------------------------------------------------------ */
- @Override
- public void channelRemoved(String channelId)
- {
- try
- {
- _lock.writeLock().lock();
- Integer count= _channels.get(channelId);
- if (count==null)
- _channels.put(channelId,-11);
- else if (--count==0)
- _channels.remove(channelId);
- else
- _channels.put(channelId,count);
- _modified.signalAll();
- }
- finally
- {
- _lock.writeLock().unlock();
- }
- }
- /* ------------------------------------------------------------ */
- public boolean waitForChannel(String channelId,long waitFor,TimeUnit units)
- throws InterruptedException
- {
- try
- {
- _lock.readLock().lock();
- Integer count = _channels.get(channelId);
- if (count!=null && count>0)
- return true;
- }
- finally
- {
- _lock.readLock().unlock();
- }
- // have to wait, so need the write lock
- try
- {
- long nanos = units.toNanos(waitFor);
- _lock.writeLock().lock();
- while(true)
- {
- Integer count = _channels.get(channelId);
- if (count!=null && count>0)
- return true;
- if (nanos>0)
- nanos=_modified.awaitNanos(nanos);
- else
- return false;
- }
- }
- finally
- {
- _lock.writeLock().unlock();
- }
- }
- /* ------------------------------------------------------------ */
- /** Get an entangled subscription listeners.
- * Subscription listeners added after this listener will be guaranteed
- * that all ChannelListeners before the ChannelInitializedListener will have been called.
- * @param waitFor
- * @param units
- * @return
- */
- ServerChannel.SubscriptionListener getEntangledSubscriptionListener(final long waitFor, final TimeUnit units)
- {
- return new ServerChannel.SubscriptionListener()
- {
- @Override
- public void unsubscribed(ServerSession session, ServerChannel channel)
- {
- try
- {
- if (waitForChannel(channel.getId(),waitFor,units))
- throw new IllegalStateException();
- }
- catch(InterruptedException e)
- {
- throw new IllegalStateException(e);
- }
- }
- @Override
- public void subscribed(ServerSession session, ServerChannel channel)
- {
- try
- {
- if (waitForChannel(channel.getId(),waitFor,units))
- throw new IllegalStateException();
- }
- catch(InterruptedException e)
- {
- throw new IllegalStateException(e);
- }
- }
- };
- }
- /* ------------------------------------------------------------ */
- /** Get an entangled message listeners.
- * Message listeners added after this listener will be guaranteed
- * that all ChannelListeners before the ChannelInitializedListener will
- * have been called.
- * @param waitFor
- * @param units
- * @return
- */
- ServerChannel.MessageListener getEntangledMessageListener(final long waitFor, final TimeUnit units)
- {
- return new ServerChannel.MessageListener()
- {
- @Override
- public boolean onMessage(ServerSession from, ServerChannel channel, Mutable message)
- {
- try
- {
- if (waitForChannel(channel.getId(),waitFor,units))
- return true;
- }
- catch(InterruptedException e)
- {
- Log.ignore(e);
- }
- return false;
- }
- };
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement