Tuesday, August 23, 2011

C# Asynchronous Task Mutual Exclusion

If you're excited as I am about Visual Studio Asynchronous Programming, you may already be using a iterator method (such as Asynchronous methods, C# iterators, and Tasks) to simulate user-space threading. But once you have shared-state concurrency, you may want mutual exclusion.

Generally the performance recommendation for shared-state concurrency is to only take locks for a short period, especially not across an asynchronous operation (there is a correctness slant as well: minimizing lock time minimizes the risk of deadlock due to different lock acquisition orders). But sometimes you want to serialize asynchronous work - for example, lazy asynchronous initialization of a resource or serializing writes to a TCP network stream.

C# has the lock keyword allowing mutual exclusion using any reference object as the gate. In this example, I start 10 tasks that perform asynchronous "work" protected by mutual exclusion. This work is simulated by waiting 10 milliseconds with a timer, but it could represent any sort of asynchronous effort, such as i/o.

private object _gate = new object();

private IEnumerable<Task> _TestMutexFail()
{
    lock (_gate)
    {
        using (var task_wait = Task.Factory.StartNewDelayed(10))
        {
            yield return task_wait;
            task_wait.Wait();
        }
    }
}

private Task TestMutexFail()
{
    return Task.Factory.Iterate(_TestMutexFail());
}

[TestMethod]
public void TestAsyncMutexFail()
{
    var tasks = Enumerable.Range(0, 10).Select(i => TestMutexFail()).ToArray();
    Task.WaitAll(tasks);
}

But since the thread running the timer callback is likely not the same thread that initiated the timer (through StartNewDelayed), you'll see a System.Threading.SynchronizationLockException
with the message "Object synchronization method was called from an unsynchronized block of code." This is the same result you would also expect where an i/o completion port thread executes the continuation that represents the completion of the i/o task initiated by your asynchronous operation.

So what is a solution? I've had good success with a simple class such as the following.

public sealed class AsyncMutex
{
    public Task<IDisposable> AcquireAsync()
    {
        var task_completion_source = new TaskCompletionSource<IDisposable>();
        lock (_mutex)
        {
            bool can_obtain_lock = (_waiters.Count == 0);
            _waiters.Enqueue(task_completion_source);
            if (can_obtain_lock)
            {
                _PassTheLock();
            }
        }
        return task_completion_source.Task;
    }

    public void Release()
    {
        lock (_mutex)
        {
            _waiters.Dequeue();
            _PassTheLock();
        }
    }

    public void Cancel()
    {
        lock (_mutex)
        {
            foreach (var waiter in _waiters)
            {
                waiter.TrySetCanceled();
            }
            _waiters.Clear();
        }
    }

    public object SynchronousMutex
    {
        get
        {
            return _mutex;
        }
    }

    private void _PassTheLock()
    {
        lock (_mutex)
        {
            bool have_waiters = (_waiters.Count > 0);
            if (have_waiters)
            {
                var next_acquirer = _waiters.First();
                next_acquirer.TrySetResult(new AsyncLock(this));
            }
        }
    }

    private sealed class AsyncLock : IDisposable
    {
        private readonly AsyncMutex _async_mutex;

        public AsyncLock(AsyncMutex async_mutex)
        {
            _async_mutex = async_mutex;
        }

        public void Dispose()
        {
            _async_mutex.Release();
        }
    }

    private readonly object _mutex = new object();
    private readonly Queue<TaskCompletionSource<IDisposable>> _waiters = new Queue<TaskCompletionSource<IDisposable>>();
}

This AsynchronousMutex which enables mutual exclusion across asynchronous tasks. It fits well with the iterator-based methods for composing asynchronous tasks.

private AsyncMutex _mutex = new AsyncMutex();
private int _concurrency_count = 0;

private IEnumerable<Task> _TestAsyncMutexPass()
{
    using (var task_acquire = _mutex.AcquireAsync())
    {
        yield return task_acquire;
        using (task_acquire.Result)
        {
            Assert.AreEqual(1, Interlocked.Increment(ref _concurrency_count));

            using (var task_wait = Task.Factory.StartNewDelayed(10))
            {
                yield return task_wait;
                task_wait.Wait();
            }

            Assert.AreEqual(0, Interlocked.Decrement(ref _concurrency_count));
        }
    }
}

private Task TestAsyncMutexPass()
{
    return Task.Factory.Iterate(_TestAsyncMutexPass());
}

[TestMethod]
public void TestAsyncMutex()
{
    var tasks = Enumerable.Range(0, 10).Select(i => TestAsyncMutexPass()).ToArray();
    Task.WaitAll(tasks);
}

By the way, if you're just using a Task to lazily initialize a resource, I can recommend the ParallelExtensionsExtras Tour - #12 - AsyncCache blog post as another method to accomplish the same goal.

No comments:

Post a Comment