Advertisement
Guest User

Untitled

a guest
Mar 25th, 2024
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 7.82 KB | None | 0 0
  1. using System.Diagnostics;
  2. using System.Threading;
  3. using Pathfinding.Util;
  4. using UnityEngine.Assertions;
  5.  
  6. namespace Pathfinding {
  7.     /** Multi-producer-multi-consumer (MPMC) channel.
  8.      *
  9.      * This is a channel that can be used to send data between threads.
  10.      * It is thread safe and can be used by multiple threads at the same time.
  11.      *
  12.      * Additionally, the channel can be put into a blocking mode, which will cause all calls to Receive to block until the channel is unblocked.
  13.      */
  14.     internal class BlockableChannel<T> where T : class {
  15.         public enum PopState {
  16.             Ok,
  17.             Wait,
  18.             Closed,
  19.         }
  20.  
  21.         readonly System.Object lockObj = new System.Object();
  22.  
  23.         CircularBuffer<T> queue = new CircularBuffer<T>(16);
  24.         public int numReceivers { get; private set; }
  25.  
  26.         // Marked as volatile such that the compiler will not try to optimize the allReceiversBlocked property too much (this is more of a theoretical concern than a practical issue).
  27.         volatile int waitingReceivers;
  28. #if !UNITY_WEBGL
  29.         ManualResetEvent starving = new ManualResetEvent(false);
  30. #endif
  31.         bool blocked;
  32.  
  33.         /** True if #Close has been called */
  34.         public bool isClosed { get; private set; }
  35.  
  36.         /** True if the queue is empty */
  37.         public bool isEmpty {
  38.             get {
  39.                 lock (lockObj) {
  40.                     return queue.Length == 0;
  41.                 }
  42.             }
  43.         }
  44.  
  45.         /** True if blocking and all receivers are waiting for unblocking */
  46.         // Note: This is designed to be lock-free for performance. But it will only generate a useful value if called from the same thread that is blocking/unblocking the queue, otherwise the return value could become invalid at any time.
  47.         public bool allReceiversBlocked => blocked && waitingReceivers == numReceivers;
  48.  
  49.         /** If true, all calls to Receive will block until this property is set to false */
  50.         public bool isBlocked {
  51.             get => blocked;
  52.             set {
  53.                 lock (lockObj) {
  54.                     blocked = value;
  55.                     if (isClosed) return;
  56.                     isStarving = value || queue.Length == 0;
  57.                 }
  58.             }
  59.         }
  60.  
  61.         /** All calls to Receive and ReceiveNoBlock will now return PopState.Closed */
  62.         public void Close () {
  63.             lock (lockObj) {
  64.                 System.Console.WriteLine($"Closing channel globally at {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
  65.                 isClosed = true;
  66.                 isStarving = false;
  67.             }
  68.         }
  69.  
  70.         bool isStarving {
  71.             get {
  72. #if UNITY_WEBGL
  73.                 // In WebGL, semaphores are not supported.
  74.                 // They will compile, but they don't work properly.
  75.                 // So instead we directly use what the starving semaphore should indicate.
  76.                 return (blocked || queue.Length == 0) && !isClosed;
  77. #else
  78.                 return !starving.WaitOne(0);
  79. #endif
  80.             }
  81.             set {
  82. #if !UNITY_WEBGL
  83.                 if (value) starving.Reset();
  84.                 else starving.Set();
  85. #endif
  86.             }
  87.         }
  88.  
  89.         /** Resets a closed channel so that it can be used again.
  90.          *
  91.          * The existing queue is preserved.
  92.          *
  93.          * This will throw an exception if there are any receivers still active.
  94.          */
  95.         public void Reopen () {
  96.             lock (lockObj) {
  97.                 if (numReceivers != 0) throw new System.InvalidOperationException("Can only reopen a channel after Close has been called on all receivers");
  98.                 Assert.AreEqual(waitingReceivers, 0);
  99.                 isClosed = false;
  100.                 isBlocked = false;
  101.             }
  102.         }
  103.  
  104.         public Receiver AddReceiver () {
  105.             lock (lockObj) {
  106.                 if (isClosed) throw new System.InvalidOperationException("Channel is closed");
  107.                 this.numReceivers++;
  108.                 return new Receiver(this);
  109.             }
  110.         }
  111.  
  112.         /** Push a path to the front of the queue */
  113.         public void PushFront (T path) {
  114.             lock (lockObj) {
  115.                 if (isClosed) throw new System.InvalidOperationException("Channel is closed");
  116.                 queue.PushStart(path);
  117.                 if (!blocked) isStarving = false;
  118.             }
  119.         }
  120.  
  121.         /** Push a path to the end of the queue */
  122.         public void Push (T path) {
  123.             lock (lockObj) {
  124.                 if (isClosed) throw new System.InvalidOperationException("Channel is closed");
  125.                 queue.PushEnd(path);
  126.                 if (!blocked) isStarving = false;
  127.             }
  128.         }
  129.  
  130.         /** Allows receiving items from a channel */
  131.         public struct Receiver {
  132.             BlockableChannel<T> channel;
  133.  
  134.             public Receiver(BlockableChannel<T> channel) {
  135.                 this.channel = channel;
  136.             }
  137.  
  138.             /** Call when a receiver was terminated.
  139.              *
  140.              * After this call, this receiver cannot be used for anything.
  141.              */
  142.             public void Close () {
  143.                 lock (channel.lockObj) {
  144.                     Assert.IsTrue(channel.numReceivers > 0);
  145.                     Assert.IsTrue(channel.waitingReceivers < channel.numReceivers);
  146.                     channel.numReceivers--;
  147.                     System.Console.WriteLine($"{System.Environment.CurrentManagedThreadId} Closing channel. {channel.numReceivers} receivers left. at  {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
  148.                 }
  149.                 channel = null;
  150.             }
  151.  
  152.             /** Receives the next item from the channel.
  153.             * This call will block if there are no items in the channel or if the channel is currently blocked.
  154.             *
  155.             * \returns PopState.Ok and a non-null item in the normal case. Returns PopState.Closed if the channel has been closed.
  156.             */
  157.             public PopState Receive (out T item) {
  158. #if UNITY_WEBGL
  159.                 throw new System.Exception("Cannot block in WebGL. Use ReceiveNoBlock instead.");
  160. #else
  161.                 Interlocked.Increment(ref channel.waitingReceivers);
  162.                 while (true) {
  163.                     System.Console.WriteLine($"{System.Environment.CurrentManagedThreadId} Starving... at  {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
  164.                     channel.starving.WaitOne();
  165.                     System.Console.WriteLine($"{System.Environment.CurrentManagedThreadId} Locking... at  {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
  166.                     // Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
  167.                     lock (channel.lockObj) {
  168.                         if (channel.isClosed) {
  169.                             System.Console.WriteLine($"{System.Environment.CurrentManagedThreadId} Closing...");
  170.                             Interlocked.Decrement(ref channel.waitingReceivers);
  171.                             item = null;
  172.                             return PopState.Closed;
  173.                         }
  174.                         if (channel.queue.Length == 0) channel.isStarving = true;
  175.                         if (channel.isStarving) continue;
  176.                         Assert.IsFalse(channel.blocked);
  177.                         Interlocked.Decrement(ref channel.waitingReceivers);
  178.                         item = channel.queue.PopStart();
  179.                         System.Console.WriteLine($"{System.Environment.CurrentManagedThreadId} Received... at  {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
  180.                         return PopState.Ok;
  181.                     }
  182.                 }
  183. #endif
  184.             }
  185.  
  186.             /** Receives the next item from the channel, this call will not block.
  187.             * To ensure a consistent state, the caller must follow this pattern.
  188.             * 1. Call ReceiveNoBlock(false), if PopState.Wait is returned, wait for a bit (e.g yield return null in a Unity coroutine)
  189.             * 2. try again with PopNoBlock(true), if PopState.Wait, wait for a bit
  190.             * 3. Repeat from step 2.
  191.             */
  192.             public PopState ReceiveNoBlock (bool blockedBefore, out T item) {
  193.                 item = null;
  194.                 if (!blockedBefore) Interlocked.Increment(ref channel.waitingReceivers);
  195.                 while (true) {
  196.                     if (channel.isStarving) return PopState.Wait;
  197.                     // Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
  198.                     lock (channel.lockObj) {
  199.                         if (channel.isClosed) {
  200.                             Interlocked.Decrement(ref channel.waitingReceivers);
  201.                             return PopState.Closed;
  202.                         }
  203.                         if (channel.queue.Length == 0) channel.isStarving = true;
  204.                         if (channel.isStarving) continue;
  205.                         Assert.IsFalse(channel.blocked);
  206.                         Interlocked.Decrement(ref channel.waitingReceivers);
  207.                         item = channel.queue.PopStart();
  208.                         return PopState.Ok;
  209.                     }
  210.                 }
  211.             }
  212.         }
  213.     }
  214. }
  215.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement