ItzWarty

[Untested] Lock-Free Single Producer + Single Consumer Queue

Nov 23rd, 2014
199
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 3.67 KB | None | 0 0
  1. using System.Collections;
  2. using System.Collections.Generic;
  3.  
  4. namespace ItzWarty.Collections {
  5.    public class SingleProducerConsumerConcurrentQueue<T> : IConcurrentQueue<T> where T : class {
  6.       public const int kBucketSize = 32;
  7.  
  8.       private class Segment<T> where T : class {
  9.          public Box<T>[] elements;
  10.          public volatile Segment<T> next;
  11.          public int readIndex;
  12.          public int writeIndex;
  13.  
  14.          public Segment(Box<T>[] elements, Segment<T> next ) {
  15.             this.elements = elements;
  16.             this.next = next;
  17.          }
  18.  
  19.          public bool IsSentinel { get { return elements == null; } }
  20.  
  21.          public class Box<T> where T : class {
  22.             public volatile T value;
  23.          }
  24.       }
  25.  
  26.       private readonly Segment<T> sentinelEnd = new Segment<T>(null, null);
  27.       private Segment<T> head;
  28.       private Segment<T> tail;
  29.  
  30.       public SingleProducerConsumerConcurrentQueue() {
  31.          head = CreateSegment();
  32.          tail = head;
  33.       }
  34.  
  35.       private Segment<T> CreateSegment() {
  36.          return new Segment<T>(Util.Generate(kBucketSize, i => new Segment<T>.Box<T>()), sentinelEnd);
  37.       }
  38.  
  39.       public void Enqueue(T item) {
  40.          tail.elements[tail.writeIndex++].value = item;
  41.          if (tail.writeIndex == kBucketSize) {
  42.             var newSegment = CreateSegment();
  43.             tail.next = newSegment;
  44.             tail = newSegment;
  45.          }
  46.       }
  47.  
  48.       public bool TryPeek(out T result) {
  49.          AdvanceHeadPastSegmentEnd();
  50.  
  51.          var index = head.readIndex;
  52.          if (index == kBucketSize) {
  53.             result = null;
  54.             return false;
  55.          } else {
  56.             result = head.elements[index].value;
  57.             return true;
  58.          }
  59.       }
  60.  
  61.       public bool TryDequeue(out T result) {
  62.          AdvanceHeadPastSegmentEnd();
  63.          
  64.          var index = head.readIndex;
  65.          if (index == kBucketSize) {
  66.             result = null;
  67.             return false;
  68.          } else {
  69.             var element = head.elements[index];
  70.             head.readIndex++;
  71.  
  72.             result = element.value;
  73.             element.value = null;
  74.             return true;
  75.          }
  76.       }
  77.  
  78.       private void AdvanceHeadPastSegmentEnd() {
  79.          var index = head.readIndex;
  80.          if (index == kBucketSize) {
  81.             if (head.next != sentinelEnd) {
  82.                head = head.next;
  83.             }
  84.          }
  85.       }
  86.  
  87.       public void CopyTo(T[] array, int index) {
  88.          foreach (var item in this) {
  89.             array[index++] = item;
  90.          }
  91.       }
  92.  
  93.       public T[] ToArray() {
  94.          var list = new List<T>();
  95.          foreach (var item in this) {
  96.             list.Add(item);
  97.          }
  98.          return list.ToArray();
  99.       }
  100.  
  101.       public int Count { get { return ComputeCount(); } }
  102.       public bool IsEmpty { get { return this.None(); } }
  103.  
  104.       private int ComputeCount() {
  105.          var count = 0;
  106.          var currentSegment = head;
  107.          while (currentSegment != tail) {
  108.             count += kBucketSize - currentSegment.readIndex;
  109.             currentSegment = currentSegment.next;
  110.          }
  111.          count += tail.writeIndex;
  112.          return count;
  113.       }
  114.  
  115.       public IEnumerator<T> GetEnumerator() {
  116.          var currentSegment = head;
  117.          while (currentSegment != sentinelEnd) {
  118.             for (var i = currentSegment.readIndex; i < kBucketSize; i++) {
  119.                yield return currentSegment.elements[i].value;
  120.             }
  121.             currentSegment = currentSegment.next;
  122.          }
  123.       }
  124.  
  125.       IEnumerator IEnumerable.GetEnumerator() {
  126.          return GetEnumerator();
  127.       }
  128.    }
  129. }
Add Comment
Please, Sign In to add comment