Advertisement
bestsss

SC/MP Queue

Apr 14th, 2014
317
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 4.30 KB | None | 0 0
  1. package bestsss.util;
  2.  
  3. import java.lang.reflect.Field;
  4.  
  5. import java.util.concurrent.atomic.AtomicReference;
  6. import java.util.concurrent.locks.LockSupport;
  7.  
  8. import sun.misc.Unsafe;
  9. /**
  10.  *
  11.  * @author Stanimir Simeonoff
  12.  * The code is in public domain: http://creativecommons.org/publicdomain/zero/1.0/
  13.  * Present impl. doesn't use any "slack" on offer() but it can be optimized allowing the actual tail to drift.
  14.  * size() attempts to use unsigned arithmetics.
  15.  */
  16.  
  17. public class SCMPQueue<E> {
  18.     private static class Node {
  19.         Node(Object item){
  20.             this.item = item;
  21.         }
  22.         volatile Node next;
  23.        
  24.         //might be read during offer(), but the race is benign - it's modified only on empty queue, so no volatile
  25.         Object item;       
  26.         //no modified after enqueued
  27.         long counter;
  28.  
  29.         @SuppressWarnings("unchecked")
  30.         <E> E get(){//writing Node<E> every single time is boring
  31.             return (E) item;
  32.         }          
  33.     }
  34.     private volatile Node head, tail;//pad them (attributes)@Contended
  35.  
  36.     private final AtomicReference<Thread> poller =new AtomicReference<Thread>();//has to be padded too
  37.     private final long bound;
  38.  
  39.     public SCMPQueue(long bound){
  40.         if (bound<0 || bound == Long.MAX_VALUE) throw new IllegalArgumentException();
  41.         this.bound = bound;
  42.         head = tail = new Node(null);          
  43.     }
  44.  
  45.  
  46.     public boolean offer(E e){//very simplistic offer      
  47.         if (e==null)
  48.             throw new NullPointerException();
  49.         for(final Node node = new Node(e);;){
  50.             Node head = first();
  51.             Node t = tail;
  52.             node.counter = t.counter +1;
  53.             if (head!=null && sub(node.counter, head.counter)>bound){
  54.                 return false;
  55.             }
  56.             if (casTail(t, node)){//CAS, so counter is set properly, it's immutable from now on
  57.                 t.next = node;//under load and frequent poll() the queue may appear empty, if the thread loses the timeslice or it's "killed" at this point
  58.                 wakeup();
  59.                 return true;
  60.             }
  61.         }
  62.     }
  63.     private void wakeup() {
  64.         Thread t = poller.get();
  65.         if (t!=null && poller.compareAndSet(t, null)){
  66.             LockSupport.unpark(t);
  67.         }
  68.     }
  69.  
  70.  
  71.     public E poll() {
  72.         for (Node h = head,current = h;;){
  73.             final E item = current.get();
  74.             final Node next = current.next;
  75.             if (item!=null){//i.e. relevant
  76.                 if (next==null){                   
  77.                         current.item = null;               
  78.                         if (current!=h)//set head, check to avoid unneeded writes
  79.                             head = current;
  80.                 } else{
  81.                     head = next;
  82.                 }
  83.                 return item;
  84.             }
  85.             //item is set to null, it has been unlinked
  86.             if (next==null)
  87.                 return null;
  88.             current = next;
  89.         }
  90.     }
  91.    
  92.     private E pollImpl(long nanos){//may spuriously return null, left for outer layers to handle
  93.         E result = poll();
  94.         if (result!=null)
  95.             return result;
  96.  
  97.         final Thread t = Thread.currentThread();
  98.         poller.set(t);
  99.         try{
  100.             //check once again, under the "lock"
  101.             result = poll();
  102.             if (result!=null)
  103.                 return result;
  104.  
  105.             if (poller.get()==t){
  106.                 if (nanos<0)
  107.                     LockSupport.park(poller);
  108.                 else
  109.                     LockSupport.parkNanos(poller, nanos);
  110.             }
  111.         }finally{
  112.             poller.lazySet(null);
  113.         }
  114.         return poll();
  115.     }
  116.    
  117.     public E poll(long nanos) throws InterruptedException{//doesn't care for spurious wakeups
  118.         E e = pollImpl(Math.max(nanos, 0));
  119.         if (e!=null)
  120.             return e;
  121.         if (Thread.interrupted())
  122.             throw new InterruptedException();
  123.         return null;
  124.     }
  125.    
  126.     public E take() throws InterruptedException{
  127.         for(;;){
  128.             E e = pollImpl(-1);
  129.             if (e!=null)
  130.                 return e;
  131.             if (Thread.interrupted())
  132.                 throw new InterruptedException();
  133.         }
  134.     }
  135.    
  136.     public long size(){
  137.         Node head = first();
  138.         return head==null?0:1+sub(tail.counter, head.counter);
  139.     }
  140.     static long sub(long tail, long head){
  141.         return tail-head;
  142.     }
  143.  
  144.     private Node first() {
  145.         Node node = head;
  146.         if (node.get()!=null)
  147.             return node;
  148.         return node.next;
  149.     }
  150.  
  151.     public boolean isEmpty() {
  152.         return first() == null;
  153.     }
  154.  
  155.  
  156.     private boolean casTail(Node cmp, Node val) {
  157.         return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
  158.     }
  159.  
  160.     private static final sun.misc.Unsafe UNSAFE;
  161.     private static final long tailOffset;
  162.     static {
  163.         try {
  164.             Field f =  sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
  165.             f.setAccessible(true);
  166.             UNSAFE = (Unsafe) f.get(null);                 
  167.             tailOffset = UNSAFE.objectFieldOffset(SCMPQueue.class.getDeclaredField("tail"));
  168.         } catch (Exception e) {
  169.             throw new Error(e);
  170.         }
  171.     }
  172. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement