- Thread-safe circular buffer in Java
- Server a: "Request id = ABCD" Status keeper=["ABCD"]
- Server b: "Request id = XYZZ" Status keeper=["ABCD", "XYZZ"]
- Server c: "Request id = 1234" Status keeper=["ABCD", "XYZZ", "1234"]
- Server b: "Request id = FOO" Status keeper=["XYZZ", "1234", "FOO"]
- Server a: "Request id = BAR" Status keeper=["1234", "FOO", "BAR"]
- Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());
- /**
- * Container
- * ---------
- *
- * A lock-free container that offers a close-to O(1) add/remove performance.
- *
- */
- public class Container<T> implements Iterable<T> {
- // The capacity of the container.
- final int capacity;
- // The list.
- AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
- // TESTING {
- AtomicLong totalAdded = new AtomicLong(0);
- AtomicLong totalFreed = new AtomicLong(0);
- AtomicLong totalSkipped = new AtomicLong(0);
- private void resetStats() {
- totalAdded.set(0);
- totalFreed.set(0);
- totalSkipped.set(0);
- }
- // TESTING }
- // Constructor
- public Container(int capacity) {
- this.capacity = capacity;
- // Construct the list.
- Node<T> h = new Node<T>();
- Node<T> it = h;
- // One created, now add (capacity - 1) more
- for (int i = 0; i < capacity - 1; i++) {
- // Add it.
- it.next = new Node<T>();
- // Step on to it.
- it = it.next;
- }
- // Make it a ring.
- it.next = h;
- // Install it.
- head.set(h);
- }
- // Empty ... NOT thread safe.
- public void clear() {
- Node<T> it = head.get();
- for (int i = 0; i < capacity; i++) {
- // Trash the element
- it.element = null;
- // Mark it free.
- it.free.set(true);
- it = it.next;
- }
- // Clear stats.
- resetStats();
- }
- // Add a new one.
- public Node<T> add(T element) {
- // Get a free node and attach the element.
- totalAdded.incrementAndGet();
- return getFree().attach(element);
- }
- // Find the next free element and mark it not free.
- private Node<T> getFree() {
- Node<T> freeNode = head.get();
- int skipped = 0;
- // Stop when we hit the end of the list
- // ... or we successfully transit a node from free to not-free.
- while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
- skipped += 1;
- freeNode = freeNode.next;
- }
- // Keep count of skipped.
- totalSkipped.addAndGet(skipped);
- if (skipped < capacity) {
- // Put the head as next.
- // Doesn't matter if it fails. That would just mean someone else was doing the same.
- head.set(freeNode.next);
- } else {
- // We hit the end! No more free nodes.
- throw new IllegalStateException("Capacity exhausted.");
- }
- return freeNode;
- }
- // Mark it free.
- public void remove(Node<T> it, T element) {
- totalFreed.incrementAndGet();
- // Remove the element first.
- it.detach(element);
- // Mark it as free.
- if (!it.free.compareAndSet(false, true)) {
- throw new IllegalStateException("Freeing a freed node.");
- }
- }
- // The Node class. It is static so needs the <T> repeated.
- public static class Node<T> {
- // The element in the node.
- private T element;
- // Are we free?
- private AtomicBoolean free = new AtomicBoolean(true);
- // The next reference in whatever list I am in.
- private Node<T> next;
- // Construct a node of the list
- private Node() {
- // Start empty.
- element = null;
- }
- // Attach the element.
- public Node<T> attach(T element) {
- // Sanity check.
- if (this.element == null) {
- this.element = element;
- } else {
- throw new IllegalArgumentException("There is already an element attached.");
- }
- // Useful for chaining.
- return this;
- }
- // Detach the element.
- public Node<T> detach(T element) {
- // Sanity check.
- if (this.element == element) {
- this.element = null;
- } else {
- throw new IllegalArgumentException("Removal of wrong element.");
- }
- // Useful for chaining.
- return this;
- }
- public T get () {
- return element;
- }
- @Override
- public String toString() {
- return element != null ? element.toString() : "null";
- }
- }
- // Provides an iterator across all items in the container.
- public Iterator<T> iterator() {
- return new UsedNodesIterator<T>(this);
- }
- // Iterates across used nodes.
- private static class UsedNodesIterator<T> implements Iterator<T> {
- // Where next to look for the next used node.
- Node<T> it;
- int limit = 0;
- T next = null;
- public UsedNodesIterator(Container<T> c) {
- // Snapshot the head node at this time.
- it = c.head.get();
- limit = c.capacity;
- }
- public boolean hasNext() {
- // Made into a `while` loop to fix issue reported by @Nim in code review
- while (next == null && limit > 0) {
- // Scan to the next non-free node.
- while (limit > 0 && it.free.get() == true) {
- it = it.next;
- // Step down 1.
- limit -= 1;
- }
- if (limit != 0) {
- next = it.element;
- }
- }
- return next != null;
- }
- public T next() {
- T n = null;
- if ( hasNext () ) {
- // Give it to them.
- n = next;
- next = null;
- // Step forward.
- it = it.next;
- limit -= 1;
- } else {
- // Not there!!
- throw new NoSuchElementException ();
- }
- return n;
- }
- public void remove() {
- throw new UnsupportedOperationException("Not supported.");
- }
- }
- @Override
- public String toString() {
- StringBuilder s = new StringBuilder();
- Separator comma = new Separator(",");
- // Keep counts too.
- int usedCount = 0;
- int freeCount = 0;
- // I will iterate the list myself as I want to count free nodes too.
- Node<T> it = head.get();
- int count = 0;
- s.append("[");
- // Scan to the end.
- while (count < capacity) {
- // Is it in-use?
- if (it.free.get() == false) {
- // Grab its element.
- T e = it.element;
- // Is it null?
- if (e != null) {
- // Good element.
- s.append(comma.sep()).append(e.toString());
- // Count them.
- usedCount += 1;
- } else {
- // Probably became free while I was traversing.
- // Because the element is detached before the entry is marked free.
- freeCount += 1;
- }
- } else {
- // Free one.
- freeCount += 1;
- }
- // Next
- it = it.next;
- count += 1;
- }
- // Decorate with counts "]used+free".
- s.append("]").append(usedCount).append("+").append(freeCount);
- if (usedCount + freeCount != capacity) {
- // Perhaps something was added/freed while we were iterating.
- s.append("?");
- }
- return s.toString();
- }
- }
- // ***** Following only needed for testing. *****
- private static boolean Debug = false;
- private final static String logName = "Container.log";
- private final static NamedFileOutput log = new NamedFileOutput("C:\Junk\");
- private static synchronized void log(boolean toStdoutToo, String s) {
- if (Debug) {
- if (toStdoutToo) {
- System.out.println(s);
- }
- log(s);
- }
- }
- private static synchronized void log(String s) {
- if (Debug) {
- try {
- log.writeLn(logName, s);
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- }
- static volatile boolean testing = true;
- // Tester object to exercise the container.
- static class Tester<T> implements Runnable {
- // My name.
- T me;
- // The container I am testing.
- Container<T> c;
- public Tester(Container<T> container, T name) {
- c = container;
- me = name;
- }
- private void pause() {
- try {
- Thread.sleep(0);
- } catch (InterruptedException ex) {
- testing = false;
- }
- }
- public void run() {
- // Spin on add/remove until stopped.
- while (testing) {
- // Add it.
- Node<T> n = c.add(me);
- log("Added " + me + ": " + c.toString());
- pause();
- // Remove it.
- c.remove(n, me);
- log("Removed " + me + ": " + c.toString());
- pause();
- }
- }
- }
- static final String[] strings = {
- "One", "Two", "Three", "Four", "Five",
- "Six", "Seven", "Eight", "Nine", "Ten"
- };
- static final int TEST_THREADS = Math.min(10, strings.length);
- public static void main(String[] args) throws InterruptedException {
- Debug = true;
- log.delete(logName);
- Container<String> c = new Container<String>(10);
- // Simple add/remove
- log(true, "Simple test");
- Node<String> it = c.add(strings[0]);
- log("Added " + c.toString());
- c.remove(it, strings[0]);
- log("Removed " + c.toString());
- // Capacity test.
- log(true, "Capacity test");
- ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
- // Fill it.
- for (int i = 0; i < strings.length; i++) {
- nodes.add(i, c.add(strings[i]));
- log("Added " + strings[i] + " " + c.toString());
- }
- // Add one more.
- try {
- c.add("Wafer thin mint!");
- } catch (IllegalStateException ise) {
- log("Full!");
- }
- c.clear();
- log("Empty: " + c.toString());
- // Iterate test.
- log(true, "Iterator test");
- for (int i = 0; i < strings.length; i++) {
- nodes.add(i, c.add(strings[i]));
- }
- StringBuilder all = new StringBuilder ();
- Separator sep = new Separator(",");
- for (String s : c) {
- all.append(sep.sep()).append(s);
- }
- log("All: "+all);
- for (int i = 0; i < strings.length; i++) {
- c.remove(nodes.get(i), strings[i]);
- }
- sep.reset();
- all.setLength(0);
- for (String s : c) {
- all.append(sep.sep()).append(s);
- }
- log("None: " + all.toString());
- // Multiple add/remove
- log(true, "Multi test");
- for (int i = 0; i < strings.length; i++) {
- nodes.add(i, c.add(strings[i]));
- log("Added " + strings[i] + " " + c.toString());
- }
- log("Filled " + c.toString());
- for (int i = 0; i < strings.length - 1; i++) {
- c.remove(nodes.get(i), strings[i]);
- log("Removed " + strings[i] + " " + c.toString());
- }
- c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
- log("Empty " + c.toString());
- // Multi-threaded add/remove
- log(true, "Threads test");
- c.clear();
- for (int i = 0; i < TEST_THREADS; i++) {
- Thread t = new Thread(new Tester<String>(c, strings[i]));
- t.setName("Tester " + strings[i]);
- log("Starting " + t.getName());
- t.start();
- }
- // Wait for 10 seconds.
- long stop = System.currentTimeMillis() + 10 * 1000;
- while (System.currentTimeMillis() < stop) {
- Thread.sleep(100);
- }
- // Stop the testers.
- testing = false;
- // Wait some more.
- Thread.sleep(1 * 100);
- // Get stats.
- double added = c.totalAdded.doubleValue();
- double skipped = c.totalSkipped.doubleValue();
- //double freed = c.freed.doubleValue();
- log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
- }
- public void writeRequest(String requestID) {
- synchronized(buffer) {
- buffer.add(requestID);
- }
- }
- public Collection<String> getRequests() {
- synchronized(buffer) {
- return buffer.clone();
- }
- }