If you're excited as I am about
Visual Studio Asynchronous Programming, you may already be using a iterator method (such as
Asynchronous methods, C# iterators, and Tasks) to simulate user-space threading. But once you have shared-state concurrency, you may want
mutual exclusion.
Generally the performance recommendation for shared-state concurrency is to only take locks for a short period, especially not across an asynchronous operation (there is a correctness slant as well: minimizing lock time minimizes the risk of deadlock due to different lock acquisition orders). But sometimes you want to serialize asynchronous work - for example, lazy asynchronous initialization of a resource or serializing writes to a TCP network stream.
C# has the lock keyword allowing mutual exclusion using any reference object as the gate. In this example, I start 10 tasks that perform asynchronous "work" protected by mutual exclusion. This work is simulated by waiting 10 milliseconds with a timer, but it could represent any sort of asynchronous effort, such as i/o.
private object _gate = new object();
private IEnumerable<Task> _TestMutexFail()
{
lock (_gate)
{
using (var task_wait = Task.Factory.StartNewDelayed(10))
{
yield return task_wait;
task_wait.Wait();
}
}
}
private Task TestMutexFail()
{
return Task.Factory.Iterate(_TestMutexFail());
}
[TestMethod]
public void TestAsyncMutexFail()
{
var tasks = Enumerable.Range(0, 10).Select(i => TestMutexFail()).ToArray();
Task.WaitAll(tasks);
}
But since the thread running the timer callback is likely not the same thread that initiated the timer (through StartNewDelayed), you'll see a System.Threading.SynchronizationLockException
with the message "Object synchronization method was called from an unsynchronized block of code." This is the same result you would also expect where an i/o completion port thread executes the continuation that represents the completion of the i/o task initiated by your asynchronous operation.
So what is a solution? I've had good success with a simple class such as the following.
public sealed class AsyncMutex
{
public Task<IDisposable> AcquireAsync()
{
var task_completion_source = new TaskCompletionSource<IDisposable>();
lock (_mutex)
{
bool can_obtain_lock = (_waiters.Count == 0);
_waiters.Enqueue(task_completion_source);
if (can_obtain_lock)
{
_PassTheLock();
}
}
return task_completion_source.Task;
}
public void Release()
{
lock (_mutex)
{
_waiters.Dequeue();
_PassTheLock();
}
}
public void Cancel()
{
lock (_mutex)
{
foreach (var waiter in _waiters)
{
waiter.TrySetCanceled();
}
_waiters.Clear();
}
}
public object SynchronousMutex
{
get
{
return _mutex;
}
}
private void _PassTheLock()
{
lock (_mutex)
{
bool have_waiters = (_waiters.Count > 0);
if (have_waiters)
{
var next_acquirer = _waiters.First();
next_acquirer.TrySetResult(new AsyncLock(this));
}
}
}
private sealed class AsyncLock : IDisposable
{
private readonly AsyncMutex _async_mutex;
public AsyncLock(AsyncMutex async_mutex)
{
_async_mutex = async_mutex;
}
public void Dispose()
{
_async_mutex.Release();
}
}
private readonly object _mutex = new object();
private readonly Queue<TaskCompletionSource<IDisposable>> _waiters = new Queue<TaskCompletionSource<IDisposable>>();
}
This AsynchronousMutex which enables mutual exclusion across asynchronous tasks. It fits well with the iterator-based methods for composing asynchronous tasks.
private AsyncMutex _mutex = new AsyncMutex();
private int _concurrency_count = 0;
private IEnumerable<Task> _TestAsyncMutexPass()
{
using (var task_acquire = _mutex.AcquireAsync())
{
yield return task_acquire;
using (task_acquire.Result)
{
Assert.AreEqual(1, Interlocked.Increment(ref _concurrency_count));
using (var task_wait = Task.Factory.StartNewDelayed(10))
{
yield return task_wait;
task_wait.Wait();
}
Assert.AreEqual(0, Interlocked.Decrement(ref _concurrency_count));
}
}
}
private Task TestAsyncMutexPass()
{
return Task.Factory.Iterate(_TestAsyncMutexPass());
}
[TestMethod]
public void TestAsyncMutex()
{
var tasks = Enumerable.Range(0, 10).Select(i => TestAsyncMutexPass()).ToArray();
Task.WaitAll(tasks);
}
By the way, if you're just using a Task to lazily initialize a resource, I can recommend the
ParallelExtensionsExtras Tour - #12 - AsyncCache blog post as another method to accomplish the same goal.