Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package blogindex.zookeeper;
- import java.io.*;
- import java.util.*;
- import java.util.concurrent.*;
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.*;
- /**
- * Handles watching for changes in a zookeeper node and calling methods on the
- * NodeListener class.
- *
- * This API is designed to be simpler than the native Zookeeper APIs for dealing
- * with file reads. More advanced locking primitives probably need to use the
- * raw API for now.
- *
- * This package also handles calling all listener methods in the main thread
- * that calls poll() and not the thread that Zookeeper is using. This can add
- * additional safety as one might block zookeeper events or corrupt a
- * non-synchronized object.
- *
- * All events are applied in the order they are received. There will never be
- * out of order or parallel execution.
- *
- * Connections are initiated on init and attempted to persist event during total
- * ensemble failure. If a ensemble goes away or your session expires an
- * onFailure method will be raised.
- *
- * Applications should probably handle onFailure in some way. Possibly shut
- * down but at least log this fact. Same with onKeeperException.
- *
- * @version $Id: $
- */
- public class NodeWatcher {
- /*
- TODO:
- - Determine which threads run what and try to separate this control
- logic into dedicated interfaces to make it a bit easier to understand.
- - only fire exceptions when I first encounter them
- */
- NodeListener listener = null;
- WatcherImpl watcherImpl = null;
- public NodeWatcher( String host,
- NodeListener listener ) {
- this.listener = listener;
- watcherImpl = new WatcherImpl( host, listener );
- watcherImpl.connect();
- }
- /**
- * Start watching a node for modifications.
- *
- */
- public void watch( String path ) {
- // We have to perform an exists() first because we need to stat() the
- // file to verify that it exists. Further, we have to perform this in
- // the poll() thread to prevent blocking IO.
- watcherImpl.enqueue( new WatchNodeRequest( path ) );
- }
- /**
- * Blocking call that waits for events to be called from the dispatch
- * thread. This thread MUST be called or your thread won't be processing
- * events in your listener.
- *
- */
- public void poll() throws InterruptedException {
- watcherImpl.poll();
- }
- }
- /**
- * Watcher delegate. Handles the real zookeeper work.
- */
- class WatcherImpl implements Watcher {
- public static final long RECONNECT_SPIN_INTERVAL = 1000L;
- public static final boolean DEFAULT_TRACE = true;
- public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
- /**
- * Keeps track of nodes that have onData events propagated and the stat
- * version so that we don't raise onData more than once for the same
- * version. There is a bug/feature in ZK which causes the same version to
- * be raise multiple times on server reconnect.
- */
- private Map<String,Stat> nodeStatMap = new ConcurrentHashMap();
- ZooKeeper zk = null;
- NodeListener listener = null;
- ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
- boolean trace = DEFAULT_TRACE;
- private String host = null;
- /**
- * True when we're connected to the cluster.
- */
- private boolean connected = false;
- /**
- * True when we've failed. Connected and failed are related but it's
- * possible to not be connected but not yet failed.
- */
- private boolean failed = false;
- private long lastDisconnected = -1;
- private int sessionTimeout = -1;
- /**
- * The last state of ZK.
- */
- private Event.KeeperState state = null;
- protected WatcherImpl( String host,
- NodeListener listener ) {
- this( host, DEFAULT_SESSION_TIMEOUT, listener );
- }
- protected WatcherImpl( String host,
- int sessionTimeout,
- NodeListener listener ) {
- this.host = host;
- this.sessionTimeout = sessionTimeout;
- this.listener = listener;
- }
- /**
- * Connect to ZK based on our existing configuration information.
- */
- protected void connect() {
- //try to connect again
- try {
- trace( "connect()ing\n" );
- zk = new ZooKeeper( host, sessionTimeout, this );
- } catch ( IOException e ) {
- //If we receive an IOException on connect it has must be bubbled up
- //to the listener.
- listener.onKeeperException( newKeeperException( e ) );
- }
- }
- /**
- * Poll for events.
- *
- */
- protected void poll() throws InterruptedException {
- synchronized( this ) {
- while( true ) {
- handleState();
- if ( connected ) {
- handleQueueEvents();
- }
- wait( 500L );
- }
- }
- }
- private void handleState() {
- //get a copy of the last state.
- Event.KeeperState state = this.state;
- if ( state != null ) {
- trace( "keeper state: %s\n", state );
- switch(state) {
- //We REALLY only care about when our session expires connect
- //can be thrown when an individual server disconnects.
- case SyncConnected:
- handleConnected();
- break;
- case Disconnected:
- handleDisconnected();
- break;
- case Expired:
- handleFailure();
- break;
- }
- //reset the state so that it is handled.
- state = null;
- }
- }
- /**
- * Handle all the events from the queue.
- */
- private void handleQueueEvents() {
- ConcurrentLinkedQueue tmp = new ConcurrentLinkedQueue( queue );
- trace( "queue size: %s\n", tmp.size() );
- trace( "queue is currently: %s\n", queue );
- while( true ) {
- //process the tmp queue because we need to re-enqueue objects if
- //they fail.
- Object entry = tmp.poll() ;
- queue.remove( entry );
- //done handling events.
- if ( entry == null )
- break;
- trace( "Handling queue event: %s\n", entry );
- if ( entry instanceof OnDataEvent ) {
- OnDataEvent event = (OnDataEvent)entry;
- fireOnDataEvent( event.path, event.stat, event.data );
- continue;
- }
- if ( entry instanceof WatchNodeRequest ) {
- WatchNodeRequest request = (WatchNodeRequest)entry;
- trace( "Running watch request for: %s\n", request.path );
- if ( watchNodeExists( request.path, true ) ) {
- continue;
- }
- enqueue( request );
- }
- }
- }
- /**
- * Handle ZK watches.
- */
- public void process(WatchedEvent event) {
- trace( "process() received WatchedEvent %s\n", event );
- Event.EventType type = event.getType();
- if ( type == type.None ) {
- state = event.getState();
- synchronized( this ) {
- notify();
- }
- return;
- }
- if ( type == type.NodeDataChanged || type == type.NodeCreated ) {
- //NOTE that this must be a blocking operation here because the ZK
- //server could disconnect/crash after the event but before the read.
- watchNodeData( event.getPath() );
- } else if ( type == type.NodeDeleted ) {
- //wait for it to come back of course...
- watchNodeExists( event.getPath() );
- }
- }
- /**
- * When we've been disconnected too long note that we've failed.
- *
- */
- private void handleDisconnected() {
- if ( failed ) return;
- if ( lastDisconnected == -1 ) {
- lastDisconnected = System.currentTimeMillis();
- } else if ( System.currentTimeMillis() - lastDisconnected > sessionTimeout ) {
- //ZK session expiration doesn't work when all the servers are
- //offline to we need to handle this by default.
- handleFailure();
- connect();
- }
- }
- /**
- * Handle failure. This can be called from multiple threads so it must be
- * synchronized. This thread that can call this are the event handler from
- * ZK and the main thread when it first calls watchNodeExists.
- */
- private synchronized void handleFailure() {
- if ( failed ) return;
- connected = false;
- failed = true;
- //we're offline.
- fireOnFailureEvent();
- }
- private void handleConnected() {
- //don't fire if we're already disconnected.
- if ( connected )
- return;
- listener.onConnect();
- connected = true;
- failed = false;
- }
- private void fireOnFailureEvent() {
- //don't fire if we're already disconnected.
- if ( connected = false )
- return;
- listener.onFailure();
- }
- /**
- * Fire an onData event, when necessary. Then update the version
- * information we're storing so that we don't fire the same version again.
- *
- */
- private void fireOnDataEvent( String path, Stat stat, byte[] data ) {
- if ( nodeStatMap.containsKey( path ) ) {
- if ( stat.getCzxid() == nodeStatMap.get( path ).getCzxid() ) {
- if ( stat.getVersion() == nodeStatMap.get( path ).getVersion() )
- return;
- }
- }
- nodeStatMap.put( path, stat );
- listener.onData( path, stat, data );
- }
- /**
- * Fired when we've seen an internal ZK exception.
- *
- */
- private void fireOnKeeperException( KeeperException e ) {
- listener.onKeeperException( e );
- }
- protected void enqueue( Object entry ) {
- trace( "enqueue() object %s\n", entry );
- queue.add( entry );
- synchronized( this ) {
- notify();
- }
- }
- protected void enqueue( String path, Stat stat, byte[] data ) {
- trace( "enqueue() data at %s\n", path );
- OnDataEvent event = new OnDataEvent();
- event.path = path;
- event.stat = stat;
- event.data = data;
- enqueue( event );
- }
- /**
- * Used so that we can have a stable interface for onKeeperException.
- *
- */
- private KeeperException newKeeperException( Exception e ) {
- KeeperException.SystemErrorException rethrow = new KeeperException.SystemErrorException();
- rethrow.initCause( e );
- return rethrow;
- }
- private Stat exists( String path, boolean watch ) throws KeeperException {
- try {
- return zk.exists( path, watch );
- } catch ( InterruptedException e ) {
- //I have no idea why it's throwing an InterruptedException...
- throw newKeeperException( e );
- }
- }
- /**
- * Get data (sync) from ZK.
- *
- */
- private byte[] getData( String path, boolean watch, Stat stat ) throws KeeperException {
- try {
- return zk.getData( path, watch, stat );
- } catch ( InterruptedException e ) {
- throw newKeeperException( e );
- }
- }
- protected boolean watchNodeExists( String path ) {
- return watchNodeExists( path, false );
- }
- /**
- * Request that we watch for when the node exists.
- *
- * Note that this retries on failure.
- *
- */
- protected boolean watchNodeExists( String path, boolean noSpinOnFailure ) {
- while( true ) {
- try {
- Stat stat = exists( path, true );
- if ( stat != null )
- watchNodeData( path, noSpinOnFailure );
- return true;
- } catch ( KeeperException.ConnectionLossException e ) {
- if ( noSpinOnFailure ) return false;
- //ConnectionLossException can be thrown when we attempt a call
- //over a zookeeper instance that is disconnected, even it if is
- //a short temporary disconnect. We should only sleep here
- //because reconnect works by listening to disconnect events and
- //waiting until the session expires.
- try {
- Thread.sleep( RECONNECT_SPIN_INTERVAL );
- } catch ( InterruptedException ie ) {
- return false;
- }
- } catch ( KeeperException e ) {
- fireOnKeeperException( e );
- handleFailure();
- }
- }
- }
- protected boolean watchNodeData( String path ) {
- return watchNodeData( path, false );
- }
- /**
- * Request that we watch future node data changes.
- *
- * Note that this retries on failure.
- *
- */
- protected boolean watchNodeData( String path, boolean noSpinOnFailure ) {
- while( true ) {
- try {
- // read the current version and watch future versions...
- Stat stat = new Stat();
- byte[] data = getData( path , true, stat );
- enqueue( path, stat, data );
- return true;
- } catch ( KeeperException.NoAuthException e ) {
- //this is normal.. we no longer have permission to read the
- //file.
- } catch ( KeeperException.NoNodeException e ) {
- //the file was removed between the event and when we attempted a
- //read. This is probably fine but it would be nice if the
- //server would send over the data.
- watchNodeExists( path );
- } catch ( KeeperException.ConnectionLossException e ) {
- //ConnectionLossException can be thrown when we attempt a call
- //over a zookeeper instance that is disconnected, even it if is
- //a short temporary disconnect. We should only sleep here
- //because reconnect works by listening to disconnect events and
- //waiting until the session expires.
- if ( noSpinOnFailure ) return false;
- try {
- Thread.sleep( RECONNECT_SPIN_INTERVAL );
- } catch ( InterruptedException ie ) {
- return false;
- }
- } catch ( KeeperException e ) {
- fireOnKeeperException( e );
- handleFailure();
- }
- }
- }
- /**
- * Simple trace method. I will migrate this to log4j later or just remove it.
- *
- */
- private void trace( String format, Object... args ) {
- if ( trace == false )
- return;
- System.out.printf( format, args );
- }
- }
- /**
- * Used to represent an onData event within the queue.
- *
- */
- class OnDataEvent {
- String path;
- Stat stat;
- byte[] data;
- }
- /**
- * Used when we need to add an async watch for ne
- */
- class WatchNodeRequest {
- String path;
- public WatchNodeRequest( String path ) {
- this.path = path;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement