Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System.Diagnostics;
- using System.Threading;
- using Pathfinding.Util;
- using UnityEngine.Assertions;
- namespace Pathfinding {
- /** Multi-producer-multi-consumer (MPMC) channel.
- *
- * This is a channel that can be used to send data between threads.
- * It is thread safe and can be used by multiple threads at the same time.
- *
- * Additionally, the channel can be put into a blocking mode, which will cause all calls to Receive to block until the channel is unblocked.
- */
- internal class BlockableChannel<T> where T : class {
- public enum PopState {
- Ok,
- Wait,
- Closed,
- }
- readonly System.Object lockObj = new System.Object();
- CircularBuffer<T> queue = new CircularBuffer<T>(16);
- public int numReceivers { get; private set; }
- // Marked as volatile such that the compiler will not try to optimize the allReceiversBlocked property too much (this is more of a theoretical concern than a practical issue).
- volatile int waitingReceivers;
- #if !UNITY_WEBGL
- ManualResetEvent starving = new ManualResetEvent(false);
- #endif
- bool blocked;
- /** True if #Close has been called */
- public bool isClosed { get; private set; }
- /** True if the queue is empty */
- public bool isEmpty {
- get {
- lock (lockObj) {
- return queue.Length == 0;
- }
- }
- }
- /** True if blocking and all receivers are waiting for unblocking */
- // Note: This is designed to be lock-free for performance. But it will only generate a useful value if called from the same thread that is blocking/unblocking the queue, otherwise the return value could become invalid at any time.
- public bool allReceiversBlocked => blocked && waitingReceivers == numReceivers;
- /** If true, all calls to Receive will block until this property is set to false */
- public bool isBlocked {
- get => blocked;
- set {
- lock (lockObj) {
- blocked = value;
- if (isClosed) return;
- isStarving = value || queue.Length == 0;
- }
- }
- }
- /** All calls to Receive and ReceiveNoBlock will now return PopState.Closed */
- public void Close () {
- lock (lockObj) {
- UnityEngine.Debug.Log($"Closing channel globally at {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
- isClosed = true;
- isStarving = false;
- }
- }
- bool isStarving {
- get {
- #if UNITY_WEBGL
- // In WebGL, semaphores are not supported.
- // They will compile, but they don't work properly.
- // So instead we directly use what the starving semaphore should indicate.
- return (blocked || queue.Length == 0) && !isClosed;
- #else
- return !starving.WaitOne(0);
- #endif
- }
- set {
- #if !UNITY_WEBGL
- if (value) starving.Reset();
- else starving.Set();
- #endif
- }
- }
- /** Resets a closed channel so that it can be used again.
- *
- * The existing queue is preserved.
- *
- * This will throw an exception if there are any receivers still active.
- */
- public void Reopen () {
- lock (lockObj) {
- if (numReceivers != 0) throw new System.InvalidOperationException("Can only reopen a channel after Close has been called on all receivers");
- Assert.AreEqual(waitingReceivers, 0);
- isClosed = false;
- isBlocked = false;
- }
- }
- public Receiver AddReceiver () {
- lock (lockObj) {
- if (isClosed) throw new System.InvalidOperationException("Channel is closed");
- this.numReceivers++;
- return new Receiver(this);
- }
- }
- /** Push a path to the front of the queue */
- public void PushFront (T path) {
- lock (lockObj) {
- if (isClosed) throw new System.InvalidOperationException("Channel is closed");
- queue.PushStart(path);
- if (!blocked) isStarving = false;
- }
- }
- /** Push a path to the end of the queue */
- public void Push (T path) {
- lock (lockObj) {
- if (isClosed) throw new System.InvalidOperationException("Channel is closed");
- queue.PushEnd(path);
- if (!blocked) isStarving = false;
- }
- }
- /** Allows receiving items from a channel */
- public struct Receiver {
- BlockableChannel<T> channel;
- public Receiver(BlockableChannel<T> channel) {
- this.channel = channel;
- }
- /** Call when a receiver was terminated.
- *
- * After this call, this receiver cannot be used for anything.
- */
- public void Close () {
- lock (channel.lockObj) {
- Assert.IsTrue(channel.numReceivers > 0);
- Assert.IsTrue(channel.waitingReceivers < channel.numReceivers);
- channel.numReceivers--;
- UnityEngine.Debug.Log($"{System.Environment.CurrentManagedThreadId} Closing channel. {channel.numReceivers} receivers left. at {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
- }
- channel = null;
- }
- /** Receives the next item from the channel.
- * This call will block if there are no items in the channel or if the channel is currently blocked.
- *
- * \returns PopState.Ok and a non-null item in the normal case. Returns PopState.Closed if the channel has been closed.
- */
- public PopState Receive (out T item) {
- #if UNITY_WEBGL
- throw new System.Exception("Cannot block in WebGL. Use ReceiveNoBlock instead.");
- #else
- Interlocked.Increment(ref channel.waitingReceivers);
- while (true) {
- UnityEngine.Debug.Log($"{System.Environment.CurrentManagedThreadId} Starving... at {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
- channel.starving.WaitOne();
- UnityEngine.Debug.Log($"{System.Environment.CurrentManagedThreadId} Locking... at {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
- // Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
- lock (channel.lockObj) {
- if (channel.isClosed) {
- UnityEngine.Debug.Log($"{System.Environment.CurrentManagedThreadId} Closing...");
- Interlocked.Decrement(ref channel.waitingReceivers);
- item = null;
- return PopState.Closed;
- }
- if (channel.queue.Length == 0) channel.isStarving = true;
- if (channel.isStarving) continue;
- Assert.IsFalse(channel.blocked);
- Interlocked.Decrement(ref channel.waitingReceivers);
- item = channel.queue.PopStart();
- UnityEngine.Debug.Log($"{System.Environment.CurrentManagedThreadId} Received... at {(double)(1000*System.Diagnostics.Stopwatch.GetTimestamp())/(double)System.Diagnostics.Stopwatch.Frequency} ms");
- return PopState.Ok;
- }
- }
- #endif
- }
- /** Receives the next item from the channel, this call will not block.
- * To ensure a consistent state, the caller must follow this pattern.
- * 1. Call ReceiveNoBlock(false), if PopState.Wait is returned, wait for a bit (e.g yield return null in a Unity coroutine)
- * 2. try again with PopNoBlock(true), if PopState.Wait, wait for a bit
- * 3. Repeat from step 2.
- */
- public PopState ReceiveNoBlock (bool blockedBefore, out T item) {
- item = null;
- if (!blockedBefore) Interlocked.Increment(ref channel.waitingReceivers);
- while (true) {
- if (channel.isStarving) return PopState.Wait;
- // Note that right here, the channel could become blocked, closed or anything really. We have to check conditions again after locking.
- lock (channel.lockObj) {
- if (channel.isClosed) {
- Interlocked.Decrement(ref channel.waitingReceivers);
- return PopState.Closed;
- }
- if (channel.queue.Length == 0) channel.isStarving = true;
- if (channel.isStarving) continue;
- Assert.IsFalse(channel.blocked);
- Interlocked.Decrement(ref channel.waitingReceivers);
- item = channel.queue.PopStart();
- return PopState.Ok;
- }
- }
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement