Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Jul 18th, 2012  |  syntax: None  |  size: 10.96 KB  |  hits: 6  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. Thread-safe circular buffer in Java
  2. Server a: "Request id = ABCD"        Status keeper=["ABCD"]
  3. Server b: "Request id = XYZZ"        Status keeper=["ABCD", "XYZZ"]
  4. Server c: "Request id = 1234"        Status keeper=["ABCD", "XYZZ", "1234"]
  5. Server b: "Request id = FOO"         Status keeper=["XYZZ", "1234", "FOO"]
  6. Server a: "Request id = BAR"         Status keeper=["1234", "FOO", "BAR"]
  7.        
  8. Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());
  9.        
  10. /**
  11.  * Container
  12.  * ---------
  13.  *
  14.  * A lock-free container that offers a close-to O(1) add/remove performance.
  15.  *
  16.  */
  17. public class Container<T> implements Iterable<T> {
  18.  
  19.   // The capacity of the container.
  20.   final int capacity;
  21.   // The list.
  22.   AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
  23.   // TESTING {
  24.   AtomicLong totalAdded = new AtomicLong(0);
  25.   AtomicLong totalFreed = new AtomicLong(0);
  26.   AtomicLong totalSkipped = new AtomicLong(0);
  27.  
  28.   private void resetStats() {
  29.     totalAdded.set(0);
  30.     totalFreed.set(0);
  31.     totalSkipped.set(0);
  32.   }
  33.   // TESTING }
  34.  
  35.   // Constructor
  36.   public Container(int capacity) {
  37.     this.capacity = capacity;
  38.     // Construct the list.
  39.     Node<T> h = new Node<T>();
  40.     Node<T> it = h;
  41.     // One created, now add (capacity - 1) more
  42.     for (int i = 0; i < capacity - 1; i++) {
  43.       // Add it.
  44.       it.next = new Node<T>();
  45.       // Step on to it.
  46.       it = it.next;
  47.     }
  48.     // Make it a ring.
  49.     it.next = h;
  50.     // Install it.
  51.     head.set(h);
  52.   }
  53.  
  54.   // Empty ... NOT thread safe.
  55.   public void clear() {
  56.     Node<T> it = head.get();
  57.     for (int i = 0; i < capacity; i++) {
  58.       // Trash the element
  59.       it.element = null;
  60.       // Mark it free.
  61.       it.free.set(true);
  62.       it = it.next;
  63.     }
  64.     // Clear stats.
  65.     resetStats();
  66.   }
  67.  
  68.   // Add a new one.
  69.   public Node<T> add(T element) {
  70.     // Get a free node and attach the element.
  71.     totalAdded.incrementAndGet();
  72.     return getFree().attach(element);
  73.   }
  74.  
  75.   // Find the next free element and mark it not free.
  76.   private Node<T> getFree() {
  77.     Node<T> freeNode = head.get();
  78.     int skipped = 0;
  79.     // Stop when we hit the end of the list
  80.     // ... or we successfully transit a node from free to not-free.
  81.     while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
  82.       skipped += 1;
  83.       freeNode = freeNode.next;
  84.     }
  85.     // Keep count of skipped.
  86.     totalSkipped.addAndGet(skipped);
  87.     if (skipped < capacity) {
  88.       // Put the head as next.
  89.       // Doesn't matter if it fails. That would just mean someone else was doing the same.
  90.       head.set(freeNode.next);
  91.     } else {
  92.       // We hit the end! No more free nodes.
  93.       throw new IllegalStateException("Capacity exhausted.");
  94.     }
  95.     return freeNode;
  96.   }
  97.  
  98.   // Mark it free.
  99.   public void remove(Node<T> it, T element) {
  100.     totalFreed.incrementAndGet();
  101.     // Remove the element first.
  102.     it.detach(element);
  103.     // Mark it as free.
  104.     if (!it.free.compareAndSet(false, true)) {
  105.       throw new IllegalStateException("Freeing a freed node.");
  106.     }
  107.   }
  108.  
  109.   // The Node class. It is static so needs the <T> repeated.
  110.   public static class Node<T> {
  111.  
  112.     // The element in the node.
  113.     private T element;
  114.     // Are we free?
  115.     private AtomicBoolean free = new AtomicBoolean(true);
  116.     // The next reference in whatever list I am in.
  117.     private Node<T> next;
  118.  
  119.     // Construct a node of the list
  120.     private Node() {
  121.       // Start empty.
  122.       element = null;
  123.     }
  124.  
  125.     // Attach the element.
  126.     public Node<T> attach(T element) {
  127.       // Sanity check.
  128.       if (this.element == null) {
  129.         this.element = element;
  130.       } else {
  131.         throw new IllegalArgumentException("There is already an element attached.");
  132.       }
  133.       // Useful for chaining.
  134.       return this;
  135.     }
  136.  
  137.     // Detach the element.
  138.     public Node<T> detach(T element) {
  139.       // Sanity check.
  140.       if (this.element == element) {
  141.         this.element = null;
  142.       } else {
  143.         throw new IllegalArgumentException("Removal of wrong element.");
  144.       }
  145.       // Useful for chaining.
  146.       return this;
  147.     }
  148.  
  149.     public T get () {
  150.       return element;
  151.     }
  152.  
  153.     @Override
  154.     public String toString() {
  155.       return element != null ? element.toString() : "null";
  156.     }
  157.   }
  158.  
  159.   // Provides an iterator across all items in the container.
  160.   public Iterator<T> iterator() {
  161.     return new UsedNodesIterator<T>(this);
  162.   }
  163.  
  164.   // Iterates across used nodes.
  165.   private static class UsedNodesIterator<T> implements Iterator<T> {
  166.     // Where next to look for the next used node.
  167.  
  168.     Node<T> it;
  169.     int limit = 0;
  170.     T next = null;
  171.  
  172.     public UsedNodesIterator(Container<T> c) {
  173.       // Snapshot the head node at this time.
  174.       it = c.head.get();
  175.       limit = c.capacity;
  176.     }
  177.  
  178.     public boolean hasNext() {
  179.       // Made into a `while` loop to fix issue reported by @Nim in code review
  180.       while (next == null && limit > 0) {
  181.         // Scan to the next non-free node.
  182.         while (limit > 0 && it.free.get() == true) {
  183.           it = it.next;
  184.           // Step down 1.
  185.           limit -= 1;
  186.         }
  187.         if (limit != 0) {
  188.           next = it.element;
  189.         }
  190.       }
  191.       return next != null;
  192.     }
  193.  
  194.     public T next() {
  195.       T n = null;
  196.       if ( hasNext () ) {
  197.         // Give it to them.
  198.         n = next;
  199.         next = null;
  200.         // Step forward.
  201.         it = it.next;
  202.         limit -= 1;
  203.       } else {
  204.         // Not there!!
  205.         throw new NoSuchElementException ();
  206.       }
  207.       return n;
  208.     }
  209.  
  210.     public void remove() {
  211.       throw new UnsupportedOperationException("Not supported.");
  212.     }
  213.   }
  214.  
  215.   @Override
  216.   public String toString() {
  217.     StringBuilder s = new StringBuilder();
  218.     Separator comma = new Separator(",");
  219.     // Keep counts too.
  220.     int usedCount = 0;
  221.     int freeCount = 0;
  222.     // I will iterate the list myself as I want to count free nodes too.
  223.     Node<T> it = head.get();
  224.     int count = 0;
  225.     s.append("[");
  226.     // Scan to the end.
  227.     while (count < capacity) {
  228.       // Is it in-use?
  229.       if (it.free.get() == false) {
  230.         // Grab its element.
  231.         T e = it.element;
  232.         // Is it null?
  233.         if (e != null) {
  234.           // Good element.
  235.           s.append(comma.sep()).append(e.toString());
  236.           // Count them.
  237.           usedCount += 1;
  238.         } else {
  239.           // Probably became free while I was traversing.
  240.           // Because the element is detached before the entry is marked free.
  241.           freeCount += 1;
  242.         }
  243.       } else {
  244.         // Free one.
  245.         freeCount += 1;
  246.       }
  247.       // Next
  248.       it = it.next;
  249.       count += 1;
  250.     }
  251.     // Decorate with counts "]used+free".
  252.     s.append("]").append(usedCount).append("+").append(freeCount);
  253.     if (usedCount + freeCount != capacity) {
  254.       // Perhaps something was added/freed while we were iterating.
  255.       s.append("?");
  256.     }
  257.     return s.toString();
  258.   }
  259. }
  260.        
  261. // ***** Following only needed for testing. *****
  262. private static boolean Debug = false;
  263. private final static String logName = "Container.log";
  264. private final static NamedFileOutput log = new NamedFileOutput("C:\Junk\");
  265.  
  266. private static synchronized void log(boolean toStdoutToo, String s) {
  267.   if (Debug) {
  268.     if (toStdoutToo) {
  269.       System.out.println(s);
  270.     }
  271.     log(s);
  272.   }
  273. }
  274.  
  275. private static synchronized void log(String s) {
  276.   if (Debug) {
  277.     try {
  278.       log.writeLn(logName, s);
  279.     } catch (IOException ex) {
  280.       ex.printStackTrace();
  281.     }
  282.   }
  283. }
  284. static volatile boolean testing = true;
  285.  
  286. // Tester object to exercise the container.
  287. static class Tester<T> implements Runnable {
  288.   // My name.
  289.  
  290.   T me;
  291.   // The container I am testing.
  292.   Container<T> c;
  293.  
  294.   public Tester(Container<T> container, T name) {
  295.     c = container;
  296.     me = name;
  297.   }
  298.  
  299.   private void pause() {
  300.     try {
  301.       Thread.sleep(0);
  302.     } catch (InterruptedException ex) {
  303.       testing = false;
  304.     }
  305.   }
  306.  
  307.   public void run() {
  308.     // Spin on add/remove until stopped.
  309.     while (testing) {
  310.       // Add it.
  311.       Node<T> n = c.add(me);
  312.       log("Added " + me + ": " + c.toString());
  313.       pause();
  314.       // Remove it.
  315.       c.remove(n, me);
  316.       log("Removed " + me + ": " + c.toString());
  317.       pause();
  318.     }
  319.   }
  320. }
  321. static final String[] strings = {
  322.   "One", "Two", "Three", "Four", "Five",
  323.   "Six", "Seven", "Eight", "Nine", "Ten"
  324. };
  325. static final int TEST_THREADS = Math.min(10, strings.length);
  326.  
  327. public static void main(String[] args) throws InterruptedException {
  328.   Debug = true;
  329.   log.delete(logName);
  330.   Container<String> c = new Container<String>(10);
  331.  
  332.   // Simple add/remove
  333.   log(true, "Simple test");
  334.   Node<String> it = c.add(strings[0]);
  335.   log("Added " + c.toString());
  336.   c.remove(it, strings[0]);
  337.   log("Removed " + c.toString());
  338.  
  339.   // Capacity test.
  340.   log(true, "Capacity test");
  341.   ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
  342.   // Fill it.
  343.   for (int i = 0; i < strings.length; i++) {
  344.     nodes.add(i, c.add(strings[i]));
  345.     log("Added " + strings[i] + " " + c.toString());
  346.   }
  347.   // Add one more.
  348.   try {
  349.     c.add("Wafer thin mint!");
  350.   } catch (IllegalStateException ise) {
  351.     log("Full!");
  352.   }
  353.   c.clear();
  354.   log("Empty: " + c.toString());
  355.  
  356.   // Iterate test.
  357.   log(true, "Iterator test");
  358.   for (int i = 0; i < strings.length; i++) {
  359.     nodes.add(i, c.add(strings[i]));
  360.   }
  361.   StringBuilder all = new StringBuilder ();
  362.   Separator sep = new Separator(",");
  363.   for (String s : c) {
  364.     all.append(sep.sep()).append(s);
  365.   }
  366.   log("All: "+all);
  367.   for (int i = 0; i < strings.length; i++) {
  368.     c.remove(nodes.get(i), strings[i]);
  369.   }
  370.   sep.reset();
  371.   all.setLength(0);
  372.   for (String s : c) {
  373.     all.append(sep.sep()).append(s);
  374.   }
  375.   log("None: " + all.toString());
  376.  
  377.   // Multiple add/remove
  378.   log(true, "Multi test");
  379.   for (int i = 0; i < strings.length; i++) {
  380.     nodes.add(i, c.add(strings[i]));
  381.     log("Added " + strings[i] + " " + c.toString());
  382.   }
  383.   log("Filled " + c.toString());
  384.   for (int i = 0; i < strings.length - 1; i++) {
  385.     c.remove(nodes.get(i), strings[i]);
  386.     log("Removed " + strings[i] + " " + c.toString());
  387.   }
  388.   c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
  389.   log("Empty " + c.toString());
  390.  
  391.   // Multi-threaded add/remove
  392.   log(true, "Threads test");
  393.   c.clear();
  394.   for (int i = 0; i < TEST_THREADS; i++) {
  395.     Thread t = new Thread(new Tester<String>(c, strings[i]));
  396.     t.setName("Tester " + strings[i]);
  397.     log("Starting " + t.getName());
  398.     t.start();
  399.   }
  400.   // Wait for 10 seconds.
  401.   long stop = System.currentTimeMillis() + 10 * 1000;
  402.   while (System.currentTimeMillis() < stop) {
  403.     Thread.sleep(100);
  404.   }
  405.   // Stop the testers.
  406.   testing = false;
  407.   // Wait some more.
  408.   Thread.sleep(1 * 100);
  409.   // Get stats.
  410.   double added = c.totalAdded.doubleValue();
  411.   double skipped = c.totalSkipped.doubleValue();
  412.   //double freed = c.freed.doubleValue();
  413.   log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
  414. }
  415.        
  416. public void writeRequest(String requestID) {
  417.     synchronized(buffer) {
  418.        buffer.add(requestID);
  419.     }
  420. }
  421.  
  422. public Collection<String> getRequests() {
  423.      synchronized(buffer) {
  424.         return buffer.clone();
  425.      }
  426. }