When I first started the project, I needed a mechanism for composable filters over a shared IP port, so that I could execute multiple copies of Paxos on the same port. The Reactive Extensions met that need, but then it was very natural to express the Paxos agents (e.g. proposer, acceptor, learner) in terms of an actor model using Observable.CreateWithDisposable, and then the Paxos agents were also composable out of smaller parts as well (e.g. WhenMajorityHasQuorum, Phase1Lead, Phase2Propose).
Everything is exposed through a replicated StateMachine with a user-provided “Execute for the next position in the replicated transaction log”.
public abstract Task ExecuteAsync(int instance, Command command);
and the reactive extensions helped here too by providing a bridge to Tasks.
The test scheduler (and a specific random seed for randomized exponential backoff) were very useful for running deterministic unit tests, and through the composable nature of the message handling, it was trivial to add a “noisy observable” to simulate packet loss and duplication to stress test the implementation. One little issue I found was that the conversion from Tasks to IObservable was not deterministic, and I needed a slightly modified ToObservable that passed TaskContinuationOptions.ExecuteSynchronously to Task.ContinueWith:
public static IObservable<T> ToObservable<T>(this Task<T> task) { return Observable.CreateWithDisposable<T>(observer => { var disposable = new CancellationDisposable(); task.ContinueWith(antecedent => { switch (antecedent.Status) { case TaskStatus.RanToCompletion: observer.OnNext(task.Result); observer.OnCompleted(); break; case TaskStatus.Canceled: observer.OnCompleted(); break; case TaskStatus.Faulted: observer.OnError(antecedent.Exception); break; } }, disposable.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current); return disposable; }); }
No comments:
Post a Comment