Tuesday, August 23, 2011

Paxos, Reactive Framework, Nondeterminism and Design for Testability

For a project at work, I implemented the Paxos distributed consensus algorithm.

When I first started the project, I needed a mechanism for composable filters over a shared IP port, so that I could execute multiple copies of Paxos on the same port. The Reactive Extensions met that need, but then it was very natural to express the Paxos agents (e.g. proposer, acceptor, learner) in terms of an actor model using Observable.CreateWithDisposable, and then the Paxos agents were also composable out of smaller parts as well (e.g. WhenMajorityHasQuorum, Phase1Lead, Phase2Propose).

Everything is exposed through a replicated StateMachine with a user-provided “Execute for the next position in the replicated transaction log”.

public abstract Task ExecuteAsync(int instance, Command command);

and the reactive extensions helped here too by providing a bridge to Tasks.

The test scheduler (and a specific random seed for randomized exponential backoff) were very useful for running deterministic unit tests, and through the composable nature of the message handling, it was trivial to add a “noisy observable” to simulate packet loss and duplication to stress test the implementation. One little issue I found was that the conversion from Tasks to IObservable was not deterministic, and I needed a slightly modified ToObservable that passed TaskContinuationOptions.ExecuteSynchronously to Task.ContinueWith:

public static IObservable<T> ToObservable<T>(this Task<T> task)
{
    return Observable.CreateWithDisposable<T>(observer =>
        {
            var disposable = new CancellationDisposable();

            task.ContinueWith(antecedent =>
            {
                switch (antecedent.Status)
                {
                    case TaskStatus.RanToCompletion:
                        observer.OnNext(task.Result);
                        observer.OnCompleted();
                        break;
                    case TaskStatus.Canceled:
                        observer.OnCompleted();
                        break;
                    case TaskStatus.Faulted:
                        observer.OnError(antecedent.Exception);
                        break;
                }
            }, disposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);

            return disposable;
        });
}

Instant Karma Links

I enjoy reading reddit. I have seen some links many times in the past, and I expect to see them again in the future.

Unskilled and Unaware of It

Dunbar's number

Can You Say ... "Hero"?

Stanley Milgram was responsible for the Small World Experiment (i.e. six degrees of separation) and Milgram experiment on obedience to authority figures.

Unhappy Meals

Norman Borlaug

Simo Häyhä

Franz Stigler

North Korea is Dark

Shadow Caravan

snopes

Bobby Tables

Have you ever tried to sell a diamond?

The Difference between the United Kingdom, Great Britain, England, and a Whole Lot More

The Man Who Had HIV and Now Does Not

Mt. Everest has around 200 dead bodies (pics)

Monty Hall Problem

How Not to Sort By Average Rating

Birthday Paradox

Netflix Instant Watcher

Henrietta Lacks’ ‘Immortal’ Cells

Joe Ades

Drowning Doesn’t Look Like Drowning

Warning Signs in Experimental Design and Interpretation

Lies, Damned Lies, and Medical Science

C# Asynchronous Task Mutual Exclusion

If you're excited as I am about Visual Studio Asynchronous Programming, you may already be using a iterator method (such as Asynchronous methods, C# iterators, and Tasks) to simulate user-space threading. But once you have shared-state concurrency, you may want mutual exclusion.

Generally the performance recommendation for shared-state concurrency is to only take locks for a short period, especially not across an asynchronous operation (there is a correctness slant as well: minimizing lock time minimizes the risk of deadlock due to different lock acquisition orders). But sometimes you want to serialize asynchronous work - for example, lazy asynchronous initialization of a resource or serializing writes to a TCP network stream.

C# has the lock keyword allowing mutual exclusion using any reference object as the gate. In this example, I start 10 tasks that perform asynchronous "work" protected by mutual exclusion. This work is simulated by waiting 10 milliseconds with a timer, but it could represent any sort of asynchronous effort, such as i/o.

private object _gate = new object();

private IEnumerable<Task> _TestMutexFail()
{
    lock (_gate)
    {
        using (var task_wait = Task.Factory.StartNewDelayed(10))
        {
            yield return task_wait;
            task_wait.Wait();
        }
    }
}

private Task TestMutexFail()
{
    return Task.Factory.Iterate(_TestMutexFail());
}

[TestMethod]
public void TestAsyncMutexFail()
{
    var tasks = Enumerable.Range(0, 10).Select(i => TestMutexFail()).ToArray();
    Task.WaitAll(tasks);
}

But since the thread running the timer callback is likely not the same thread that initiated the timer (through StartNewDelayed), you'll see a System.Threading.SynchronizationLockException
with the message "Object synchronization method was called from an unsynchronized block of code." This is the same result you would also expect where an i/o completion port thread executes the continuation that represents the completion of the i/o task initiated by your asynchronous operation.

So what is a solution? I've had good success with a simple class such as the following.

public sealed class AsyncMutex
{
    public Task<IDisposable> AcquireAsync()
    {
        var task_completion_source = new TaskCompletionSource<IDisposable>();
        lock (_mutex)
        {
            bool can_obtain_lock = (_waiters.Count == 0);
            _waiters.Enqueue(task_completion_source);
            if (can_obtain_lock)
            {
                _PassTheLock();
            }
        }
        return task_completion_source.Task;
    }

    public void Release()
    {
        lock (_mutex)
        {
            _waiters.Dequeue();
            _PassTheLock();
        }
    }

    public void Cancel()
    {
        lock (_mutex)
        {
            foreach (var waiter in _waiters)
            {
                waiter.TrySetCanceled();
            }
            _waiters.Clear();
        }
    }

    public object SynchronousMutex
    {
        get
        {
            return _mutex;
        }
    }

    private void _PassTheLock()
    {
        lock (_mutex)
        {
            bool have_waiters = (_waiters.Count > 0);
            if (have_waiters)
            {
                var next_acquirer = _waiters.First();
                next_acquirer.TrySetResult(new AsyncLock(this));
            }
        }
    }

    private sealed class AsyncLock : IDisposable
    {
        private readonly AsyncMutex _async_mutex;

        public AsyncLock(AsyncMutex async_mutex)
        {
            _async_mutex = async_mutex;
        }

        public void Dispose()
        {
            _async_mutex.Release();
        }
    }

    private readonly object _mutex = new object();
    private readonly Queue<TaskCompletionSource<IDisposable>> _waiters = new Queue<TaskCompletionSource<IDisposable>>();
}

This AsynchronousMutex which enables mutual exclusion across asynchronous tasks. It fits well with the iterator-based methods for composing asynchronous tasks.

private AsyncMutex _mutex = new AsyncMutex();
private int _concurrency_count = 0;

private IEnumerable<Task> _TestAsyncMutexPass()
{
    using (var task_acquire = _mutex.AcquireAsync())
    {
        yield return task_acquire;
        using (task_acquire.Result)
        {
            Assert.AreEqual(1, Interlocked.Increment(ref _concurrency_count));

            using (var task_wait = Task.Factory.StartNewDelayed(10))
            {
                yield return task_wait;
                task_wait.Wait();
            }

            Assert.AreEqual(0, Interlocked.Decrement(ref _concurrency_count));
        }
    }
}

private Task TestAsyncMutexPass()
{
    return Task.Factory.Iterate(_TestAsyncMutexPass());
}

[TestMethod]
public void TestAsyncMutex()
{
    var tasks = Enumerable.Range(0, 10).Select(i => TestAsyncMutexPass()).ToArray();
    Task.WaitAll(tasks);
}

By the way, if you're just using a Task to lazily initialize a resource, I can recommend the ParallelExtensionsExtras Tour - #12 - AsyncCache blog post as another method to accomplish the same goal.

Best Effort Cloning in C#

private static readonly MethodInfo _method_memberwise_clone = typeof(object).GetMethod("MemberwiseClone", BindingFlags.Instance | BindingFlags.NonPublic);

public static T Clone<T>(T item)
{
    var cloneable = item as ICloneable;
    if (cloneable != null)
    {
        return (T)cloneable.Clone();
    }
    else
    {
        return (T)_method_memberwise_clone.Invoke(item, null);
    }
}

BadImageFormatException with ASP.NET and native dlls

There are times when you would like to use a native DLL with ASP.NET, and the ASP.NET compilation process fails with a BadImageFormatException because the compiler and the native DLL are different architectures (e.g. one is x86, and the other amd64) and the compiler will try to open all DLLs in the bin directory. Here is the web.config magic to avoid this issue. In this example, native.dll is the DLL I'd like the ASP.NET compiler to ignore.

<?xml version="1.0"?>
<configuration>
  <system.web>
    <compilation debug="false">
      <assemblies>
        <remove assembly="native"/>
      </assemblies>
    </compilation>
  </system.web>
</configuration>