Generally the performance recommendation for shared-state concurrency is to only take locks for a short period, especially not across an asynchronous operation (there is a correctness slant as well: minimizing lock time minimizes the risk of deadlock due to different lock acquisition orders). But sometimes you want to serialize asynchronous work - for example, lazy asynchronous initialization of a resource or serializing writes to a TCP network stream.
C# has the lock keyword allowing mutual exclusion using any reference object as the gate. In this example, I start 10 tasks that perform asynchronous "work" protected by mutual exclusion. This work is simulated by waiting 10 milliseconds with a timer, but it could represent any sort of asynchronous effort, such as i/o.
private object _gate = new object(); private IEnumerable<Task> _TestMutexFail() { lock (_gate) { using (var task_wait = Task.Factory.StartNewDelayed(10)) { yield return task_wait; task_wait.Wait(); } } } private Task TestMutexFail() { return Task.Factory.Iterate(_TestMutexFail()); } [TestMethod] public void TestAsyncMutexFail() { var tasks = Enumerable.Range(0, 10).Select(i => TestMutexFail()).ToArray(); Task.WaitAll(tasks); }
But since the thread running the timer callback is likely not the same thread that initiated the timer (through StartNewDelayed), you'll see a System.Threading.SynchronizationLockException
with the message "Object synchronization method was called from an unsynchronized block of code." This is the same result you would also expect where an i/o completion port thread executes the continuation that represents the completion of the i/o task initiated by your asynchronous operation.
So what is a solution? I've had good success with a simple class such as the following.
public sealed class AsyncMutex { public Task<IDisposable> AcquireAsync() { var task_completion_source = new TaskCompletionSource<IDisposable>(); lock (_mutex) { bool can_obtain_lock = (_waiters.Count == 0); _waiters.Enqueue(task_completion_source); if (can_obtain_lock) { _PassTheLock(); } } return task_completion_source.Task; } public void Release() { lock (_mutex) { _waiters.Dequeue(); _PassTheLock(); } } public void Cancel() { lock (_mutex) { foreach (var waiter in _waiters) { waiter.TrySetCanceled(); } _waiters.Clear(); } } public object SynchronousMutex { get { return _mutex; } } private void _PassTheLock() { lock (_mutex) { bool have_waiters = (_waiters.Count > 0); if (have_waiters) { var next_acquirer = _waiters.First(); next_acquirer.TrySetResult(new AsyncLock(this)); } } } private sealed class AsyncLock : IDisposable { private readonly AsyncMutex _async_mutex; public AsyncLock(AsyncMutex async_mutex) { _async_mutex = async_mutex; } public void Dispose() { _async_mutex.Release(); } } private readonly object _mutex = new object(); private readonly Queue<TaskCompletionSource<IDisposable>> _waiters = new Queue<TaskCompletionSource<IDisposable>>(); }
This AsynchronousMutex which enables mutual exclusion across asynchronous tasks. It fits well with the iterator-based methods for composing asynchronous tasks.
private AsyncMutex _mutex = new AsyncMutex(); private int _concurrency_count = 0; private IEnumerable<Task> _TestAsyncMutexPass() { using (var task_acquire = _mutex.AcquireAsync()) { yield return task_acquire; using (task_acquire.Result) { Assert.AreEqual(1, Interlocked.Increment(ref _concurrency_count)); using (var task_wait = Task.Factory.StartNewDelayed(10)) { yield return task_wait; task_wait.Wait(); } Assert.AreEqual(0, Interlocked.Decrement(ref _concurrency_count)); } } } private Task TestAsyncMutexPass() { return Task.Factory.Iterate(_TestAsyncMutexPass()); } [TestMethod] public void TestAsyncMutex() { var tasks = Enumerable.Range(0, 10).Select(i => TestAsyncMutexPass()).ToArray(); Task.WaitAll(tasks); }
By the way, if you're just using a Task to lazily initialize a resource, I can recommend the ParallelExtensionsExtras Tour - #12 - AsyncCache blog post as another method to accomplish the same goal.
No comments:
Post a Comment