Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.cometd.server.util;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- import org.cometd.bayeux.server.BayeuxServer;
- import org.cometd.bayeux.server.ServerChannel;
- import org.cometd.bayeux.server.ServerSession;
- import org.cometd.bayeux.server.ServerMessage.Mutable;
- import org.eclipse.jetty.util.log.Log;
- /* ------------------------------------------------------------ */
- /** Channel Initialised Listener.
- * This listener implements a latch on a channel name, which can be
- * waited on to ensure that all prior channel listeners have been
- * executed before continuing.
- */
- public class ChannelInitializedListener implements BayeuxServer.ChannelListener
- {
- final ConcurrentMap<String, CountDownLatch> _latchMap=new ConcurrentHashMap<String, CountDownLatch>();
- @Override
- public void channelAdded(ServerChannel channel)
- {
- final String id=channel.getId();
- CountDownLatch latch = _latchMap.get(id);
- if (latch==null)
- {
- CountDownLatch done = new CountDownLatch(0);
- latch = _latchMap.putIfAbsent(id,done);
- if (latch!=null)
- latch.countDown();
- }
- else
- latch.countDown();
- }
- @Override
- public void channelRemoved(String channelId)
- {
- // hmmmm
- }
- public void waitForChannel(String channelId)
- throws InterruptedException
- {
- CountDownLatch latch = _latchMap.get(channelId);
- if (latch==null)
- {
- CountDownLatch proposed = new CountDownLatch(1);
- latch = _latchMap.putIfAbsent(channelId,proposed);
- if (latch==null)
- latch=proposed;
- }
- latch.await();
- }
- public boolean waitForChannel(String channelId,long waitFor,TimeUnit units)
- throws InterruptedException
- {
- CountDownLatch latch = _latchMap.get(channelId);
- if (latch==null)
- {
- CountDownLatch proposed = new CountDownLatch(1);
- latch = _latchMap.putIfAbsent(channelId,proposed);
- if (latch==null)
- latch=proposed;
- }
- return latch.await(waitFor,units);
- }
- /* ------------------------------------------------------------ */
- /** Get an entangled subscription listeners.
- * Subscription listeners added after this listener will be guaranteed
- * that all ChannelListeners before the ChannelInitializedListener will have been called.
- * @param waitFor
- * @param units
- * @return
- */
- ServerChannel.SubscriptionListener getEntangledSubscriptionListener(final long waitFor, final TimeUnit units)
- {
- return new ServerChannel.SubscriptionListener()
- {
- @Override
- public void unsubscribed(ServerSession session, ServerChannel channel)
- {
- try
- {
- if (waitForChannel(channel.getId(),waitFor,units))
- throw new IllegalStateException();
- }
- catch(InterruptedException e)
- {
- throw new IllegalStateException(e);
- }
- }
- @Override
- public void subscribed(ServerSession session, ServerChannel channel)
- {
- try
- {
- if (waitForChannel(channel.getId(),waitFor,units))
- throw new IllegalStateException();
- }
- catch(InterruptedException e)
- {
- throw new IllegalStateException(e);
- }
- }
- };
- }
- /* ------------------------------------------------------------ */
- /** Get an entangled message listeners.
- * Message listeners added after this listener will be guaranteed
- * that all ChannelListeners before the ChannelInitializedListener will
- * have been called.
- * @param waitFor
- * @param units
- * @return
- */
- ServerChannel.MessageListener getEntangledMessageListener(final long waitFor, final TimeUnit units)
- {
- return new ServerChannel.MessageListener()
- {
- @Override
- public boolean onMessage(ServerSession from, ServerChannel channel, Mutable message)
- {
- try
- {
- if (waitForChannel(channel.getId(),waitFor,units))
- return true;
- }
- catch(InterruptedException e)
- {
- Log.ignore(e);
- }
- return false;
- }
- };
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement