Advertisement
Guest User

Untitled

a guest
Jan 8th, 2009
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 16.21 KB | None | 0 0
  1. package blogindex.zookeeper;
  2.  
  3. import java.io.*;
  4. import java.util.*;
  5. import java.util.concurrent.*;
  6.  
  7. import org.apache.zookeeper.*;
  8. import org.apache.zookeeper.data.*;
  9.  
  10. /**
  11.  * Handles watching for changes in a zookeeper node and calling methods on the
  12.  * NodeListener class.
  13.  *
  14.  * This API is designed to be simpler than the native Zookeeper APIs for dealing
  15.  * with file reads.  More advanced locking primitives probably need to use the
  16.  * raw API for now.
  17.  *
  18.  * This package also handles calling all listener methods in the main thread
  19.  * that calls poll() and not the thread that Zookeeper is using.  This can add
  20.  * additional safety as one might block zookeeper events or corrupt a
  21.  * non-synchronized object.
  22.  *
  23.  * All events are applied in the order they are received.  There will never be
  24.  * out of order or parallel execution.
  25.  *
  26.  * Connections are initiated on init and attempted to persist event during total
  27.  * ensemble failure.  If a ensemble goes away or your session expires an
  28.  * onFailure method will be raised.
  29.  *
  30.  * Applications should probably handle onFailure in some way.  Possibly shut
  31.  * down but at least log this fact.  Same with onKeeperException.
  32.  *
  33.  * @version $Id: $
  34.  */
  35. public class NodeWatcher {
  36.  
  37.     /*
  38.       TODO:
  39.  
  40.         - Determine which threads run what and try to separate this control
  41.           logic into dedicated interfaces to make it a bit easier to understand.
  42.  
  43.         - only fire exceptions when I first encounter them
  44.          
  45.      */
  46.    
  47.     NodeListener listener = null;
  48.     WatcherImpl watcherImpl = null;
  49.  
  50.     public NodeWatcher( String host,
  51.                         NodeListener listener ) {
  52.  
  53.         this.listener = listener;
  54.         watcherImpl = new WatcherImpl( host, listener );
  55.         watcherImpl.connect();
  56.  
  57.     }
  58.    
  59.     /**
  60.      * Start watching a node for modifications.
  61.      *
  62.      */
  63.     public void watch( String path ) {
  64.  
  65.         // We have to perform an exists() first because we need to stat() the
  66.         // file to verify that it exists.  Further, we have to perform this in
  67.         // the poll() thread to prevent blocking IO.
  68.  
  69.         watcherImpl.enqueue( new WatchNodeRequest( path ) );
  70.        
  71.     }
  72.  
  73.     /**
  74.      * Blocking call that waits for events to be called from the dispatch
  75.      * thread.  This thread MUST be called or your thread won't be processing
  76.      * events in your listener.
  77.      *
  78.      */
  79.     public void poll() throws InterruptedException {
  80.         watcherImpl.poll();
  81.     }
  82.    
  83. }
  84.  
  85. /**
  86.  * Watcher delegate.  Handles the real zookeeper work.
  87.  */
  88. class WatcherImpl implements Watcher {
  89.  
  90.     public static final long RECONNECT_SPIN_INTERVAL = 1000L;
  91.  
  92.     public static final boolean DEFAULT_TRACE = true;
  93.  
  94.     public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
  95.  
  96.     /**
  97.      * Keeps track of nodes that have onData events propagated and the stat
  98.      * version so that we don't raise onData more than once for the same
  99.      * version.  There is a bug/feature in ZK which causes the same version to
  100.      * be raise multiple times on server reconnect.
  101.      */
  102.     private Map<String,Stat> nodeStatMap = new ConcurrentHashMap();
  103.    
  104.     ZooKeeper zk = null;
  105.     NodeListener listener = null;
  106.     ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
  107.  
  108.     boolean trace = DEFAULT_TRACE;
  109.    
  110.     private String host = null;
  111.  
  112.     /**
  113.      * True when we're connected to the cluster.
  114.      */
  115.     private boolean connected = false;
  116.  
  117.     /**
  118.      * True when we've failed.  Connected and failed are related but it's
  119.      * possible to not be connected but not yet failed.
  120.      */
  121.     private boolean failed = false;
  122.    
  123.     private long lastDisconnected = -1;
  124.  
  125.     private int sessionTimeout = -1;
  126.  
  127.     /**
  128.      * The last state of ZK.    
  129.      */
  130.     private Event.KeeperState state = null;
  131.    
  132.     protected WatcherImpl( String host,
  133.                            NodeListener listener ) {
  134.         this( host, DEFAULT_SESSION_TIMEOUT, listener );
  135.     }
  136.  
  137.     protected WatcherImpl( String host,
  138.                            int sessionTimeout,
  139.                            NodeListener listener ) {
  140.         this.host = host;
  141.         this.sessionTimeout = sessionTimeout;
  142.         this.listener = listener;
  143.     }
  144.  
  145.     /**
  146.      * Connect to ZK based on our existing configuration information.
  147.      */
  148.     protected void connect() {
  149.  
  150.         //try to connect again
  151.         try {
  152.  
  153.             trace( "connect()ing\n" );
  154.             zk = new ZooKeeper( host, sessionTimeout, this );
  155.  
  156.         } catch ( IOException e ) {
  157.  
  158.             //If we receive an IOException on connect it has must be bubbled up
  159.             //to the listener.
  160.             listener.onKeeperException( newKeeperException( e ) );
  161.            
  162.         }
  163.  
  164.     }
  165.  
  166.     /**
  167.      * Poll for events.  
  168.      *
  169.      */
  170.     protected void poll() throws InterruptedException {
  171.  
  172.         synchronized( this ) {
  173.  
  174.             while( true ) {
  175.  
  176.                 handleState();
  177.  
  178.                 if ( connected ) {
  179.                     handleQueueEvents();
  180.                 }
  181.                
  182.                 wait( 500L );
  183.                
  184.             }
  185.  
  186.         }
  187.        
  188.     }
  189.  
  190.     private void handleState() {
  191.  
  192.         //get a copy of the last state.
  193.         Event.KeeperState state = this.state;
  194.  
  195.         if ( state != null ) {
  196.  
  197.             trace( "keeper state: %s\n", state );
  198.            
  199.             switch(state) {
  200.  
  201.                 //We REALLY only care about when our session expires connect
  202.                 //can be thrown when an individual server disconnects.
  203.  
  204.                 case SyncConnected:
  205.                     handleConnected();
  206.                     break;
  207.                 case Disconnected:
  208.                     handleDisconnected();
  209.                     break;
  210.                 case Expired:
  211.                     handleFailure();
  212.                     break;
  213.                
  214.             }
  215.  
  216.             //reset the state so that it is handled.
  217.             state = null;
  218.            
  219.         }
  220.  
  221.     }
  222.    
  223.     /**
  224.      * Handle all the events from the queue.
  225.      */
  226.     private void handleQueueEvents() {
  227.  
  228.         ConcurrentLinkedQueue tmp = new ConcurrentLinkedQueue( queue );
  229.  
  230.         trace( "queue size: %s\n", tmp.size() );
  231.         trace( "queue is currently: %s\n", queue );
  232.        
  233.         while( true ) {
  234.  
  235.             //process the tmp queue because we need to re-enqueue objects if
  236.             //they fail.
  237.             Object entry = tmp.poll() ;
  238.             queue.remove( entry );
  239.            
  240.             //done handling events.
  241.             if ( entry == null )
  242.                 break;
  243.  
  244.             trace( "Handling queue event: %s\n", entry );
  245.  
  246.             if ( entry instanceof OnDataEvent ) {
  247.  
  248.                 OnDataEvent event = (OnDataEvent)entry;
  249.  
  250.                 fireOnDataEvent( event.path, event.stat, event.data );
  251.                
  252.                 continue;
  253.             }
  254.  
  255.             if ( entry instanceof WatchNodeRequest ) {
  256.                
  257.                 WatchNodeRequest request = (WatchNodeRequest)entry;
  258.  
  259.                 trace( "Running watch request for: %s\n", request.path );
  260.                
  261.                 if ( watchNodeExists( request.path, true ) ) {
  262.                     continue;
  263.                 }
  264.                
  265.                 enqueue( request );
  266.  
  267.             }
  268.            
  269.         }
  270.  
  271.     }
  272.  
  273.     /**
  274.      * Handle ZK watches.
  275.      */
  276.     public void process(WatchedEvent event) {
  277.  
  278.         trace( "process() received WatchedEvent %s\n", event );
  279.        
  280.         Event.EventType type = event.getType();
  281.  
  282.         if ( type == type.None ) {
  283.  
  284.             state = event.getState();
  285.  
  286.             synchronized( this ) {
  287.                 notify();
  288.             }
  289.  
  290.             return;
  291.            
  292.         }
  293.        
  294.         if ( type == type.NodeDataChanged || type == type.NodeCreated ) {
  295.  
  296.             //NOTE that this must be a blocking operation here because the ZK
  297.             //server could disconnect/crash after the event but before the read.
  298.             watchNodeData( event.getPath() );
  299.                
  300.         } else if ( type == type.NodeDeleted ) {
  301.  
  302.             //wait for it to come back of course...
  303.             watchNodeExists( event.getPath() );
  304.  
  305.         }
  306.  
  307.     }
  308.  
  309.     /**
  310.      * When we've been disconnected too long note that we've failed.
  311.      *
  312.      */
  313.     private void handleDisconnected() {
  314.        
  315.         if ( failed ) return;
  316.  
  317.         if ( lastDisconnected == -1 ) {
  318.  
  319.             lastDisconnected = System.currentTimeMillis();
  320.  
  321.         } else if ( System.currentTimeMillis() - lastDisconnected > sessionTimeout ) {
  322.  
  323.             //ZK session expiration doesn't work when all the servers are
  324.             //offline to we need to handle this by default.
  325.            
  326.             handleFailure();
  327.             connect();
  328.            
  329.         }
  330.        
  331.     }
  332.  
  333.     /**
  334.      * Handle failure.  This can be called from multiple threads so it must be
  335.      * synchronized.  This thread that can call this are the event handler from
  336.      * ZK and the main thread when it first calls watchNodeExists.
  337.      */
  338.     private synchronized void handleFailure() {
  339.  
  340.         if ( failed ) return;
  341.  
  342.         connected = false;
  343.         failed = true;
  344.  
  345.         //we're offline.
  346.         fireOnFailureEvent();
  347.        
  348.     }
  349.    
  350.     private void handleConnected() {
  351.        
  352.         //don't fire if we're already disconnected.
  353.         if ( connected )
  354.             return;
  355.  
  356.         listener.onConnect();
  357.  
  358.         connected = true;
  359.         failed = false;
  360.  
  361.     }
  362.  
  363.     private void fireOnFailureEvent() {
  364.  
  365.         //don't fire if we're already disconnected.
  366.         if ( connected = false )
  367.             return;
  368.        
  369.         listener.onFailure();
  370.  
  371.     }
  372.    
  373.     /**
  374.      * Fire an onData event, when necessary.  Then update the version
  375.      * information we're storing so that we don't fire the same version again.
  376.      *
  377.      */
  378.     private void fireOnDataEvent( String path, Stat stat, byte[] data ) {
  379.        
  380.         if ( nodeStatMap.containsKey( path ) ) {
  381.  
  382.             if ( stat.getCzxid() == nodeStatMap.get( path ).getCzxid() ) {
  383.                
  384.                 if ( stat.getVersion() == nodeStatMap.get( path ).getVersion() )
  385.                     return;
  386.                
  387.             }
  388.            
  389.         }
  390.  
  391.         nodeStatMap.put( path, stat );
  392.        
  393.         listener.onData( path, stat, data );
  394.  
  395.     }
  396.  
  397.     /**
  398.      * Fired when we've seen an internal ZK exception.
  399.      *
  400.      */
  401.     private void fireOnKeeperException( KeeperException e ) {
  402.         listener.onKeeperException( e );
  403.     }
  404.    
  405.     protected void enqueue( Object entry ) {
  406.  
  407.         trace( "enqueue() object %s\n", entry );
  408.  
  409.         queue.add( entry );
  410.        
  411.         synchronized( this ) {
  412.             notify();
  413.         }
  414.  
  415.     }
  416.  
  417.     protected void enqueue( String path, Stat stat, byte[] data ) {
  418.  
  419.         trace( "enqueue() data at %s\n", path );
  420.        
  421.         OnDataEvent event = new OnDataEvent();
  422.         event.path = path;
  423.         event.stat = stat;
  424.         event.data = data;
  425.  
  426.         enqueue( event );
  427.  
  428.     }
  429.  
  430.     /**
  431.      * Used so that we can have a stable interface for onKeeperException.
  432.      *
  433.      */
  434.     private KeeperException newKeeperException( Exception e ) {
  435.  
  436.         KeeperException.SystemErrorException rethrow = new KeeperException.SystemErrorException();
  437.         rethrow.initCause( e );
  438.         return rethrow;
  439.  
  440.     }
  441.    
  442.     private Stat exists( String path, boolean watch ) throws KeeperException {
  443.         try {
  444.             return zk.exists( path, watch );
  445.         } catch ( InterruptedException e ) {
  446.             //I have no idea why it's throwing an InterruptedException...
  447.             throw newKeeperException( e );
  448.         }
  449.     }
  450.  
  451.     /**
  452.      * Get data (sync) from ZK.
  453.      *
  454.      */
  455.     private byte[] getData( String path, boolean watch, Stat stat ) throws KeeperException {
  456.  
  457.         try {
  458.             return zk.getData( path, watch, stat );
  459.         } catch ( InterruptedException e ) {
  460.             throw newKeeperException( e );
  461.         }
  462.     }
  463.  
  464.     protected boolean watchNodeExists( String path ) {
  465.         return watchNodeExists( path, false );
  466.     }
  467.  
  468.     /**
  469.      * Request that we watch for when the node exists.
  470.      *
  471.      * Note that this retries on failure.
  472.      *
  473.      */
  474.     protected boolean watchNodeExists( String path, boolean noSpinOnFailure ) {
  475.  
  476.         while( true ) {
  477.        
  478.             try {
  479.  
  480.                 Stat stat = exists( path, true );
  481.  
  482.                 if ( stat != null )
  483.                     watchNodeData( path, noSpinOnFailure );
  484.  
  485.                 return true;
  486.                
  487.             } catch ( KeeperException.ConnectionLossException e ) {
  488.  
  489.                 if ( noSpinOnFailure ) return false;
  490.  
  491.                 //ConnectionLossException can be thrown when we attempt a call
  492.                 //over a zookeeper instance that is disconnected, even it if is
  493.                 //a short temporary disconnect.  We should only sleep here
  494.                 //because reconnect works by listening to disconnect events and
  495.                 //waiting until the session expires.
  496.  
  497.                 try {
  498.                     Thread.sleep( RECONNECT_SPIN_INTERVAL );
  499.                 } catch ( InterruptedException ie ) {
  500.                     return false;
  501.                 }
  502.  
  503.             } catch ( KeeperException e ) {
  504.  
  505.                 fireOnKeeperException( e );
  506.                 handleFailure();
  507.  
  508.             }
  509.  
  510.         }
  511.        
  512.     }
  513.  
  514.     protected boolean watchNodeData( String path ) {
  515.         return watchNodeData( path, false );
  516.     }
  517.  
  518.     /**
  519.      * Request that we watch future node data changes.
  520.      *
  521.      * Note that this retries on failure.
  522.      *
  523.      */
  524.     protected boolean watchNodeData( String path, boolean noSpinOnFailure ) {
  525.  
  526.         while( true ) {
  527.        
  528.             try {
  529.                
  530.                 // read the current version and watch future versions...
  531.                 Stat stat = new Stat();
  532.                 byte[] data = getData( path , true, stat );
  533.  
  534.                 enqueue( path, stat, data );
  535.                 return true;
  536.  
  537.             } catch ( KeeperException.NoAuthException e ) {
  538.  
  539.                 //this is normal.. we no longer have permission to read the
  540.                 //file.
  541.                
  542.             } catch ( KeeperException.NoNodeException e ) {
  543.                
  544.                 //the file was removed between the event and when we attempted a
  545.                 //read.  This is probably fine but it would be nice if the
  546.                 //server would send over the data.
  547.                 watchNodeExists( path );
  548.  
  549.             } catch ( KeeperException.ConnectionLossException e ) {
  550.  
  551.                 //ConnectionLossException can be thrown when we attempt a call
  552.                 //over a zookeeper instance that is disconnected, even it if is
  553.                 //a short temporary disconnect.  We should only sleep here
  554.                 //because reconnect works by listening to disconnect events and
  555.                 //waiting until the session expires.
  556.  
  557.                 if ( noSpinOnFailure ) return false;
  558.  
  559.                 try {
  560.                     Thread.sleep( RECONNECT_SPIN_INTERVAL );
  561.                 } catch ( InterruptedException ie ) {
  562.                     return false;
  563.                 }
  564.  
  565.             } catch ( KeeperException e ) {
  566.  
  567.                 fireOnKeeperException( e );
  568.                 handleFailure();
  569.                
  570.             }
  571.  
  572.         }
  573.            
  574.     }
  575.    
  576.     /**
  577.      * Simple trace method.  I will migrate this to log4j later or just remove it.
  578.      *
  579.      */
  580.     private void trace( String format, Object... args ) {
  581.  
  582.         if ( trace == false )
  583.             return;
  584.        
  585.         System.out.printf( format, args );
  586.        
  587.     }
  588.    
  589. }
  590.  
  591. /**
  592.  * Used to represent an onData event within the queue.
  593.  *
  594.  */
  595. class OnDataEvent {
  596.     String path;
  597.     Stat stat;
  598.     byte[] data;
  599. }
  600.  
  601. /**
  602.  * Used when we need to add an async watch for ne
  603.  */
  604. class WatchNodeRequest {
  605.     String path;
  606.  
  607.     public WatchNodeRequest( String path ) {
  608.         this.path = path;
  609.     }
  610.    
  611. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement