Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package bestsss.util;
- import java.lang.reflect.Field;
- import java.util.concurrent.atomic.AtomicReference;
- import java.util.concurrent.locks.LockSupport;
- import sun.misc.Unsafe;
- /**
- *
- * @author Stanimir Simeonoff
- * The code is in public domain: http://creativecommons.org/publicdomain/zero/1.0/
- * Present impl. doesn't use any "slack" on offer() but it can be optimized allowing the actual tail to drift.
- * size() attempts to use unsigned arithmetics.
- */
- public class SCMPQueue<E> {
- private static class Node {
- Node(Object item){
- this.item = item;
- }
- volatile Node next;
- //might be read during offer(), but the race is benign - it's modified only on empty queue, so no volatile
- Object item;
- //no modified after enqueued
- long counter;
- @SuppressWarnings("unchecked")
- <E> E get(){//writing Node<E> every single time is boring
- return (E) item;
- }
- }
- private volatile Node head, tail;//pad them (attributes)@Contended
- private final AtomicReference<Thread> poller =new AtomicReference<Thread>();//has to be padded too
- private final long bound;
- public SCMPQueue(long bound){
- if (bound<0 || bound == Long.MAX_VALUE) throw new IllegalArgumentException();
- this.bound = bound;
- head = tail = new Node(null);
- }
- public boolean offer(E e){//very simplistic offer
- if (e==null)
- throw new NullPointerException();
- for(final Node node = new Node(e);;){
- Node head = first();
- Node t = tail;
- node.counter = t.counter +1;
- if (head!=null && sub(node.counter, head.counter)>bound){
- return false;
- }
- if (casTail(t, node)){//CAS, so counter is set properly, it's immutable from now on
- t.next = node;//under load and frequent poll() the queue may appear empty, if the thread loses the timeslice or it's "killed" at this point
- wakeup();
- return true;
- }
- }
- }
- private void wakeup() {
- Thread t = poller.get();
- if (t!=null && poller.compareAndSet(t, null)){
- LockSupport.unpark(t);
- }
- }
- public E poll() {
- for (Node h = head,current = h;;){
- final E item = current.get();
- final Node next = current.next;
- if (item!=null){//i.e. relevant
- if (next==null){
- current.item = null;
- if (current!=h)//set head, check to avoid unneeded writes
- head = current;
- } else{
- head = next;
- }
- return item;
- }
- //item is set to null, it has been unlinked
- if (next==null)
- return null;
- current = next;
- }
- }
- private E pollImpl(long nanos){//may spuriously return null, left for outer layers to handle
- E result = poll();
- if (result!=null)
- return result;
- final Thread t = Thread.currentThread();
- poller.set(t);
- try{
- //check once again, under the "lock"
- result = poll();
- if (result!=null)
- return result;
- if (poller.get()==t){
- if (nanos<0)
- LockSupport.park(poller);
- else
- LockSupport.parkNanos(poller, nanos);
- }
- }finally{
- poller.lazySet(null);
- }
- return poll();
- }
- public E poll(long nanos) throws InterruptedException{//doesn't care for spurious wakeups
- E e = pollImpl(Math.max(nanos, 0));
- if (e!=null)
- return e;
- if (Thread.interrupted())
- throw new InterruptedException();
- return null;
- }
- public E take() throws InterruptedException{
- for(;;){
- E e = pollImpl(-1);
- if (e!=null)
- return e;
- if (Thread.interrupted())
- throw new InterruptedException();
- }
- }
- public long size(){
- Node head = first();
- return head==null?0:1+sub(tail.counter, head.counter);
- }
- static long sub(long tail, long head){
- return tail-head;
- }
- private Node first() {
- Node node = head;
- if (node.get()!=null)
- return node;
- return node.next;
- }
- public boolean isEmpty() {
- return first() == null;
- }
- private boolean casTail(Node cmp, Node val) {
- return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
- }
- private static final sun.misc.Unsafe UNSAFE;
- private static final long tailOffset;
- static {
- try {
- Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- UNSAFE = (Unsafe) f.get(null);
- tailOffset = UNSAFE.objectFieldOffset(SCMPQueue.class.getDeclaredField("tail"));
- } catch (Exception e) {
- throw new Error(e);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement