using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
namespace AsyncTest
{
/// Provides a SynchronizationContext that's single-threaded.
public class SingleThreadSynchronizationContext : SynchronizationContext
{
/// The queue of work items.
private readonly BlockingCollection> m_queue =
new BlockingCollection>();
/// The processing thread.
private readonly Thread m_thread = Thread.CurrentThread;
/// Dispatches an asynchronous message to the synchronization context.
/// The System.Threading.SendOrPostCallback delegate to call.
/// The object passed to the delegate.
public override void Post(SendOrPostCallback d, object state)
{
if (d == null) throw new ArgumentNullException("d");
m_queue.Add(new KeyValuePair(d, state));
}
/// Not supported.
public override void Send(SendOrPostCallback d, object state)
{
throw new NotSupportedException("Synchronously sending is not supported.");
}
/// Runs an loop to process all queued work items.
public void RunOnCurrentThread()
{
foreach (var workItem in m_queue.GetConsumingEnumerable())
workItem.Key(workItem.Value);
}
/// Notifies the context that no more work will arrive.
public void Complete() { m_queue.CompleteAdding(); }
}
class Program
{
public static void Run(Func func)
{
var prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new SingleThreadSynchronizationContext();
//var syncCtx = new System.Windows.Threading.DispatcherSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(syncCtx);
var t = func();
t.ContinueWith(
delegate { syncCtx.Complete(); }, TaskScheduler.Default);
syncCtx.RunOnCurrentThread();
//var frame = new System.Windows.Threading.DispatcherFrame();
//t.ContinueWith(_ => { frame.Continue = false; },
// TaskScheduler.Default);
//System.Windows.Threading.Dispatcher.PushFrame(frame);
t.GetAwaiter().GetResult();
}
finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}
static async Task TestStuff(int nb)
{
Console.WriteLine("Entering teststuff " + Thread.CurrentThread.ManagedThreadId);
using (var scope = new TransactionScope())
{
int wait = 1000 + nb;
await Task.Delay(wait);
scope.Complete();
}
Console.WriteLine("Leaving teststuff " + Thread.CurrentThread.ManagedThreadId);
}
static void Main(string[] args)
{
Run(async delegate
{
var tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
tasks[i] = TestStuff(i);
}
await Task.WhenAll(tasks);
});
Console.ReadLine();
}
}
}