Advertisement
Guest User

Untitled

a guest
Jul 15th, 2013
209
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 16.24 KB | None | 0 0
  1. package jsr166y;
  2.  
  3. import java.io.IOException;
  4. import java.io.ObjectInputStream;
  5. import java.io.ObjectOutputStream;
  6. import java.io.Serializable;
  7. import java.lang.reflect.Field;
  8. import java.security.AccessController;
  9. import java.security.PrivilegedActionException;
  10. import java.security.PrivilegedExceptionAction;
  11. import java.util.AbstractQueue;
  12. import java.util.Collection;
  13. import java.util.Iterator;
  14. import java.util.NoSuchElementException;
  15. import java.util.concurrent.TimeUnit;
  16. import java.util.concurrent.locks.LockSupport;
  17. import sun.misc.Unsafe;
  18.  
  19. public class LinkedTransferQueue<E> extends AbstractQueue<E>
  20. implements TransferQueue<E>, Serializable
  21. {
  22. private static final long serialVersionUID = -3223113410248163686L;
  23. private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
  24. private static final int FRONT_SPINS = 128;
  25. private static final int CHAINED_SPINS = 64;
  26. static final int SWEEP_THRESHOLD = 32;
  27. volatile transient Node head;
  28. private volatile transient Node tail;
  29. private volatile transient int sweepVotes;
  30. private static final int NOW = 0;
  31. private static final int ASYNC = 1;
  32. private static final int SYNC = 2;
  33. private static final int TIMED = 3;
  34. private static final Unsafe UNSAFE = getUnsafe();
  35.  
  36. private static final long headOffset = objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
  37.  
  38. private static final long tailOffset = objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
  39.  
  40. private static final long sweepVotesOffset = objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class);
  41.  
  42. private boolean casTail(Node cmp, Node val)
  43. {
  44. return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
  45. }
  46.  
  47. private boolean casHead(Node cmp, Node val) {
  48. return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
  49. }
  50.  
  51. private boolean casSweepVotes(int cmp, int val) {
  52. return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
  53. }
  54.  
  55. static <E> E cast(Object item)
  56. {
  57. assert ((item == null) || (item.getClass() != Node.class));
  58. return item;
  59. }
  60.  
  61. private E xfer(E e, boolean haveData, int how, long nanos)
  62. {
  63. if ((haveData) && (e == null))
  64. throw new NullPointerException();
  65. Node s = null;
  66. Node pred;
  67. do {
  68. Node h = this.head; for (Node p = h; p != null; ) {
  69. boolean isData = p.isData;
  70. Object item = p.item;
  71. if (item != p) if ((item != null) == isData) {
  72. if (isData == haveData)
  73. break;
  74. if (p.casItem(item, e)) {
  75. for (Node q = p; q != h; ) {
  76. Node n = q.next;
  77. if (this.head == h) if (casHead(h, n == null ? q : n)) {
  78. h.forgetNext();
  79. break;
  80. }
  81. if (((h = this.head) == null) ||
  82. ((q = h.next) == null) || (!q.isMatched()))
  83. break;
  84. }
  85. LockSupport.unpark(p.waiter);
  86. return cast(item);
  87. }
  88. }
  89. Node n = p.next;
  90. p = h = this.head;
  91. }
  92.  
  93. if (how == 0) break;
  94. if (s == null)
  95. s = new Node(e, haveData);
  96. pred = tryAppend(s, haveData);
  97. }while (pred == null);
  98.  
  99. if (how != 1) {
  100. return awaitMatch(s, pred, e, how == 3, nanos);
  101. }
  102. return e;
  103. }
  104.  
  105. private Node tryAppend(Node s, boolean haveData)
  106. {
  107. Node t = this.tail; Node p = t;
  108. while (true)
  109. if ((p == null) && ((p = this.head) == null)) {
  110. if (casHead(null, s))
  111. return s;
  112. } else {
  113. if (p.cannotPrecede(haveData))
  114. return null;
  115. Node n;
  116. if ((n = p.next) != null)
  117. {
  118. Node u;
  119. p =
  120. p != n ? n : (p != t) && (t != (u = this.tail)) ? (t = u) :
  121. null; } else {
  122. if (p.casNext(null, s)) break;
  123. p = p.next;
  124. }
  125. }
  126. while ((p != t) &&
  127. ((this.tail != t) || (!casTail(t, s))) &&
  128. ((t = this.tail) != null) &&
  129. ((s = t.next) != null) &&
  130. ((s = s.next) != null) && (s != t));
  131. return p;
  132. }
  133.  
  134. private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos)
  135. {
  136. long lastTime = timed ? System.nanoTime() : 0L;
  137. Thread w = Thread.currentThread();
  138. int spins = -1;
  139. ThreadLocalRandom randomYields = null;
  140. while (true)
  141. {
  142. Object item = s.item;
  143. if (item != e) {
  144. assert (item != s);
  145. s.forgetContents();
  146. return cast(item);
  147. }
  148. if (((w.isInterrupted()) || ((timed) && (nanos <= 0L))) &&
  149. (s.casItem(e, s))) {
  150. unsplice(pred, s);
  151. return e;
  152. }
  153.  
  154. if (spins < 0) {
  155. if ((spins = spinsFor(pred, s.isData)) > 0)
  156. randomYields = ThreadLocalRandom.current();
  157. }
  158. else if (spins > 0) {
  159. spins--;
  160. if (randomYields.nextInt(64) == 0)
  161. Thread.yield();
  162. }
  163. else if (s.waiter == null) {
  164. s.waiter = w;
  165. }
  166. else if (timed) {
  167. long now = System.nanoTime();
  168. if (nanos -= now - lastTime > 0L)
  169. {
  170. LockSupport.parkNanos(this, nanos);
  171. }
  172. lastTime = now;
  173. }
  174. else
  175. {
  176. LockSupport.park(this);
  177. }
  178. }
  179. }
  180.  
  181. private static int spinsFor(Node pred, boolean haveData)
  182. {
  183. if ((MP) && (pred != null)) {
  184. if (pred.isData != haveData)
  185. return 192;
  186. if (pred.isMatched())
  187. return 128;
  188. if (pred.waiter == null)
  189. return 64;
  190. }
  191. return 0;
  192. }
  193.  
  194. final Node succ(Node p)
  195. {
  196. Node next = p.next;
  197. return p == next ? this.head : next;
  198. }
  199.  
  200. private Node firstOfMode(boolean isData)
  201. {
  202. for (Node p = this.head; p != null; p = succ(p)) {
  203. if (!p.isMatched())
  204. return p.isData == isData ? p : null;
  205. }
  206. return null;
  207. }
  208.  
  209. private E firstDataItem()
  210. {
  211. for (Node p = this.head; p != null; p = succ(p)) {
  212. Object item = p.item;
  213. if (p.isData) {
  214. if ((item != null) && (item != p))
  215. return cast(item);
  216. }
  217. else if (item == null)
  218. return null;
  219. }
  220. return null;
  221. }
  222.  
  223. private int countOfMode(boolean data)
  224. {
  225. int count = 0;
  226. for (Node p = this.head; p != null; ) {
  227. if (!p.isMatched()) {
  228. if (p.isData != data)
  229. return 0;
  230. count++; if (count == 2147483647)
  231. break;
  232. }
  233. Node n = p.next;
  234. if (n != p) {
  235. p = n;
  236. } else {
  237. count = 0;
  238. p = this.head;
  239. }
  240. }
  241. return count;
  242. }
  243.  
  244. final void unsplice(Node pred, Node s)
  245. {
  246. s.forgetContents();
  247.  
  248. if ((pred != null) && (pred != s) && (pred.next == s)) {
  249. Node n = s.next;
  250. if ((n == null) || (
  251. (n != s) && (pred.casNext(s, n)) && (pred.isMatched()))) {
  252. while (true) {
  253. Node h = this.head;
  254. if ((h == pred) || (h == s) || (h == null))
  255. return;
  256. if (!h.isMatched())
  257. break;
  258. Node hn = h.next;
  259. if (hn == null)
  260. return;
  261. if ((hn != h) && (casHead(h, hn)))
  262. h.forgetNext();
  263. }
  264. if ((pred.next != pred) && (s.next != s)) {
  265. int v;
  266. do { do { v = this.sweepVotes;
  267. if (v >= 32) break; }
  268. while (!casSweepVotes(v, v + 1));
  269. break;
  270. }
  271. while (!casSweepVotes(v, 0));
  272. sweep();
  273. }
  274. }
  275. }
  276. }
  277.  
  278. private void sweep()
  279. {
  280. Node s;
  281. for (Node p = this.head; (p != null) && ((s = p.next) != null); )
  282. {
  283. Node s;
  284. if (p == s) {
  285. p = this.head;
  286. } else if (!s.isMatched()) {
  287. p = s;
  288. }
  289. else
  290. {
  291. Node n;
  292. if ((n = s.next) == null) {
  293. break;
  294. }
  295. p.casNext(s, n);
  296. }
  297. }
  298. }
  299.  
  300. private boolean findAndRemove(Object e)
  301. {
  302. if (e != null) {
  303. Node pred = null; for (Node p = this.head; p != null; ) {
  304. Object item = p.item;
  305. if (p.isData) {
  306. if ((item != null) && (item != p) && (e.equals(item)) &&
  307. (p.tryMatchData())) {
  308. unsplice(pred, p);
  309. return true;
  310. }
  311. }
  312. else if (item == null)
  313. break;
  314. pred = p;
  315. if ((p = p.next) == pred) {
  316. pred = null;
  317. p = this.head;
  318. }
  319. }
  320. }
  321. return false;
  322. }
  323.  
  324. public LinkedTransferQueue()
  325. {
  326. }
  327.  
  328. public LinkedTransferQueue(Collection<? extends E> c)
  329. {
  330. this();
  331. addAll(c);
  332. }
  333.  
  334. public void put(E e)
  335. {
  336. xfer(e, true, 1, 0L);
  337. }
  338.  
  339. public boolean offer(E e, long timeout, TimeUnit unit)
  340. {
  341. xfer(e, true, 1, 0L);
  342. return true;
  343. }
  344.  
  345. public boolean offer(E e)
  346. {
  347. xfer(e, true, 1, 0L);
  348. return true;
  349. }
  350.  
  351. public boolean add(E e)
  352. {
  353. xfer(e, true, 1, 0L);
  354. return true;
  355. }
  356.  
  357. public boolean tryTransfer(E e)
  358. {
  359. return xfer(e, true, 0, 0L) == null;
  360. }
  361.  
  362. public void transfer(E e)
  363. throws InterruptedException
  364. {
  365. if (xfer(e, true, 2, 0L) != null) {
  366. Thread.interrupted();
  367. throw new InterruptedException();
  368. }
  369. }
  370.  
  371. public boolean tryTransfer(E e, long timeout, TimeUnit unit)
  372. throws InterruptedException
  373. {
  374. if (xfer(e, true, 3, unit.toNanos(timeout)) == null)
  375. return true;
  376. if (!Thread.interrupted())
  377. return false;
  378. throw new InterruptedException();
  379. }
  380.  
  381. public E take() throws InterruptedException {
  382. Object e = xfer(null, false, 2, 0L);
  383. if (e != null)
  384. return e;
  385. Thread.interrupted();
  386. throw new InterruptedException();
  387. }
  388.  
  389. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  390. Object e = xfer(null, false, 3, unit.toNanos(timeout));
  391. if ((e != null) || (!Thread.interrupted()))
  392. return e;
  393. throw new InterruptedException();
  394. }
  395.  
  396. public E poll() {
  397. return xfer(null, false, 0, 0L);
  398. }
  399.  
  400. public int drainTo(Collection<? super E> c)
  401. {
  402. if (c == null)
  403. throw new NullPointerException();
  404. if (c == this)
  405. throw new IllegalArgumentException();
  406. int n = 0;
  407. Object e;
  408. while ((e = poll()) != null)
  409. {
  410. Object e;
  411. c.add(e);
  412. n++;
  413. }
  414. return n;
  415. }
  416.  
  417. public int drainTo(Collection<? super E> c, int maxElements)
  418. {
  419. if (c == null)
  420. throw new NullPointerException();
  421. if (c == this)
  422. throw new IllegalArgumentException();
  423. int n = 0;
  424. Object e;
  425. while ((n < maxElements) && ((e = poll()) != null))
  426. {
  427. Object e;
  428. c.add(e);
  429. n++;
  430. }
  431. return n;
  432. }
  433.  
  434. public Iterator<E> iterator()
  435. {
  436. return new Itr();
  437. }
  438.  
  439. public E peek() {
  440. return firstDataItem();
  441. }
  442.  
  443. public boolean isEmpty()
  444. {
  445. for (Node p = this.head; p != null; p = succ(p)) {
  446. if (!p.isMatched())
  447. return !p.isData;
  448. }
  449. return true;
  450. }
  451.  
  452. public boolean hasWaitingConsumer() {
  453. return firstOfMode(false) != null;
  454. }
  455.  
  456. public int size()
  457. {
  458. return countOfMode(true);
  459. }
  460.  
  461. public int getWaitingConsumerCount() {
  462. return countOfMode(false);
  463. }
  464.  
  465. public boolean remove(Object o)
  466. {
  467. return findAndRemove(o);
  468. }
  469.  
  470. public int remainingCapacity()
  471. {
  472. return 2147483647;
  473. }
  474.  
  475. private void writeObject(ObjectOutputStream s)
  476. throws IOException
  477. {
  478. s.defaultWriteObject();
  479. for (Object e : this) {
  480. s.writeObject(e);
  481. }
  482. s.writeObject(null);
  483. }
  484.  
  485. private void readObject(ObjectInputStream s)
  486. throws IOException, ClassNotFoundException
  487. {
  488. s.defaultReadObject();
  489. while (true) {
  490. Object item = s.readObject();
  491. if (item == null) {
  492. break;
  493. }
  494. offer(item);
  495. }
  496. }
  497.  
  498. static long objectFieldOffset(Unsafe UNSAFE, String field, Class<?> klazz)
  499. {
  500. try
  501. {
  502. return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
  503. }
  504. catch (NoSuchFieldException e) {
  505. NoSuchFieldError error = new NoSuchFieldError(field);
  506. error.initCause(e);
  507. throw error;
  508. }
  509. }
  510.  
  511. static Unsafe getUnsafe()
  512. {
  513. try
  514. {
  515. return Unsafe.getUnsafe();
  516. } catch (SecurityException se) {
  517. try {
  518. return (Unsafe)AccessController.doPrivileged(
  519. new PrivilegedExceptionAction()
  520. {
  521. public Unsafe run() throws Exception {
  522. Field f = Unsafe.class
  523. .getDeclaredField("theUnsafe");
  524. f.setAccessible(true);
  525. return (Unsafe)f.get(null);
  526. } } );
  527. } catch (PrivilegedActionException e) {
  528. throw new RuntimeException("Could not initialize intrinsics",
  529. e.getCause());
  530. }
  531. }
  532. }
  533.  
  534. final class Itr
  535. implements Iterator<E>
  536. {
  537. private LinkedTransferQueue.Node nextNode;
  538. private E nextItem;
  539. private LinkedTransferQueue.Node lastRet;
  540. private LinkedTransferQueue.Node lastPred;
  541.  
  542. private void advance(LinkedTransferQueue.Node prev)
  543. {
  544. this.lastPred = this.lastRet;
  545. this.lastRet = prev;
  546. for (LinkedTransferQueue.Node p = prev == null ? LinkedTransferQueue.this.head : LinkedTransferQueue.this.succ(prev);
  547. p != null; p = LinkedTransferQueue.this.succ(p)) {
  548. Object item = p.item;
  549. if (p.isData) {
  550. if ((item != null) && (item != p)) {
  551. this.nextItem = LinkedTransferQueue.cast(item);
  552. this.nextNode = p;
  553. }
  554. }
  555. else
  556. if (item == null)
  557. break;
  558. }
  559. this.nextNode = null;
  560. }
  561.  
  562. Itr() {
  563. advance(null);
  564. }
  565.  
  566. public final boolean hasNext() {
  567. return this.nextNode != null;
  568. }
  569.  
  570. public final E next() {
  571. LinkedTransferQueue.Node p = this.nextNode;
  572. if (p == null) throw new NoSuchElementException();
  573. Object e = this.nextItem;
  574. advance(p);
  575. return e;
  576. }
  577.  
  578. public final void remove() {
  579. LinkedTransferQueue.Node p = this.lastRet;
  580. if (p == null) throw new IllegalStateException();
  581. if (p.tryMatchData())
  582. LinkedTransferQueue.this.unsplice(this.lastPred, p);
  583. }
  584. }
  585.  
  586. static final class Node
  587. {
  588. final boolean isData;
  589. volatile Object item;
  590. volatile Node next;
  591. volatile Thread waiter;
  592. private static final Unsafe UNSAFE = LinkedTransferQueue.getUnsafe();
  593.  
  594. private static final long nextOffset = LinkedTransferQueue.objectFieldOffset(UNSAFE, "next", Node.class);
  595.  
  596. private static final long itemOffset = LinkedTransferQueue.objectFieldOffset(UNSAFE, "item", Node.class);
  597.  
  598. private static final long waiterOffset = LinkedTransferQueue.objectFieldOffset(UNSAFE, "waiter", Node.class);
  599. private static final long serialVersionUID = -3375979862319811754L;
  600.  
  601. final boolean casNext(Node cmp, Node val)
  602. {
  603. return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  604. }
  605.  
  606. final boolean casItem(Object cmp, Object val) {
  607. assert ((cmp == null) || (cmp.getClass() != Node.class));
  608. return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  609. }
  610.  
  611. Node(Object item, boolean isData)
  612. {
  613. UNSAFE.putObject(this, itemOffset, item);
  614. this.isData = isData;
  615. }
  616.  
  617. final void forgetNext()
  618. {
  619. UNSAFE.putObject(this, nextOffset, this);
  620. }
  621.  
  622. final void forgetContents()
  623. {
  624. UNSAFE.putObject(this, itemOffset, this);
  625. UNSAFE.putObject(this, waiterOffset, null);
  626. }
  627.  
  628. final boolean isMatched()
  629. {
  630. Object x = this.item;
  631. if (x != this) if ((x == null) != this.isData) return false;
  632. return true;
  633. }
  634.  
  635. final boolean isUnmatchedRequest()
  636. {
  637. return (!this.isData) && (this.item == null);
  638. }
  639.  
  640. final boolean cannotPrecede(boolean haveData)
  641. {
  642. boolean d = this.isData;
  643. Object x;
  644. if ((d != haveData) && ((x = this.item) != this)) if ((x != null) == d) return true;
  645. return false;
  646. }
  647.  
  648. final boolean tryMatchData()
  649. {
  650. assert (this.isData);
  651. Object x = this.item;
  652. if ((x != null) && (x != this) && (casItem(x, null))) {
  653. LockSupport.unpark(this.waiter);
  654. return true;
  655. }
  656. return false;
  657. }
  658. }
  659. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement