Advertisement
Guest User

Untitled

a guest
Oct 22nd, 2016
66
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.39 KB | None | 0 0
  1. protected final boolean offerColdPath(final E[] buffer, final long mask, final E e, final long index,
  2. final long offset) {
  3. final long lookAheadStep = this.lookAheadStep;
  4. // normal case, go around the buffer or resize if full (unless we hit max capacity)
  5. if (lookAheadStep > 0) {
  6. long lookAheadElementOffset = calcElementOffset(index + lookAheadStep, mask);
  7. // Try and look ahead a number of elements so we don't have to do this all the time
  8. if (null == lvElement(buffer, lookAheadElementOffset)) {
  9. producerBufferLimit = index + lookAheadStep - 1; // joy, there's plenty of room
  10. writeToQueue(buffer, e, index, offset);
  11. return true;
  12. }
  13. // we're at max capacity, can use up last element
  14. final int maxCapacity = maxQueueCapacity;
  15. if (mask + 1 == maxCapacity) {
  16. if (null == lvElement(buffer, offset)) {
  17. writeToQueue(buffer, e, index, offset);
  18. return true;
  19. }
  20. // we're full and can't grow
  21. return false;
  22. }
  23. // not at max capacity, so must allow extra slot for JUMP
  24. if (null == lvElement(buffer, calcElementOffset(index + 1, mask))) { // buffer is not full
  25. writeToQueue(buffer, e, index, offset);
  26. } else {
  27. // allocate new buffer of same length
  28. final E[] newBuffer = allocate((int) (2*(mask +1) + 1));
  29.  
  30. producerBuffer = newBuffer;
  31. producerMask = newBuffer.length - 2;
  32.  
  33. final long offsetInNew = calcElementOffset(index, producerMask);
  34. linkOldToNew(index, buffer, offset, newBuffer, offsetInNew, e);
  35.  
  36. int newCapacity = (int) (producerMask + 1);
  37. if (newCapacity == maxCapacity) {
  38. long currConsumerIndex = lvConsumerIndex();
  39. // use lookAheadStep to store the consumer distance from final buffer
  40. this.lookAheadStep = -(index - currConsumerIndex);
  41. producerBufferLimit = currConsumerIndex + maxCapacity - 1;
  42. } else {
  43. producerBufferLimit = index + producerMask - 1;
  44. adjustLookAheadStep(newCapacity);
  45. }
  46. }
  47. return true;
  48. }
  49. // the step is negative (or zero) in the period between allocating the max sized buffer and the
  50. // consumer starting on it
  51. else {
  52. final long prevElementsInOtherBuffers = -lookAheadStep;
  53. // until the consumer starts using the current buffer we need to check consumer index to
  54. // verify size
  55. long currConsumerIndex = lvConsumerIndex();
  56. int size = (int) (index - currConsumerIndex);
  57. int maxCapacity = (int) mask+1; // we're on max capacity or we wouldn't be here
  58. if (size == maxCapacity) {
  59. // consumer index has not changed since adjusting the lookAhead index, we're full
  60. return false;
  61. }
  62. // if consumerIndex progressed enough so that current size indicates it is on same buffer
  63. long firstIndexInCurrentBuffer = producerBufferLimit - maxCapacity + prevElementsInOtherBuffers;
  64. if (currConsumerIndex >= firstIndexInCurrentBuffer) {
  65. // job done, we've now settled into our final state
  66. adjustLookAheadStep(maxCapacity);
  67. }
  68. // consumer is still on some other buffer
  69. else {
  70. // how many elements out of buffer?
  71. this.lookAheadStep = (int) (currConsumerIndex - firstIndexInCurrentBuffer);
  72. }
  73. producerBufferLimit = currConsumerIndex + maxCapacity;
  74. writeToQueue(buffer, e, index, offset);
  75. return true;
  76. }
  77. }
  78.  
  79. private void adjustLookAheadStep(int capacity) {
  80. lookAheadStep = Math.min(capacity / 4, SpscArrayQueue.MAX_LOOK_AHEAD_STEP);
  81. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement