Wednesday 23 April 2014

Reactive Extensions Observables Versus Regular .NET Events Part 1

Since it’s original release, .NET has provided it’s own support for raising events and subscribing to them. This is essentially an implementation of the “Observer pattern”, and for the most part works well, although the way it was implemented was less than ideal.

The shortcomings include the need to copy the event handler to a temporary variable before raising it to avoid a nasty (but rare) race condition, not being able to pass events as objects (resulting in hacky workarounds in mocking and unit testing frameworks to deal with them), and the rather cumbersome standard event handler function signature, which has the “sender” object as the first parameter (requiring you to cast it to the concrete type you already know it is if you want to make use of it), and the arguments property needing to derive from a rather pointless EventArgs base class.

But there is another option, which doesn’t require you to create your own custom implementation of the observer pattern, and that is to make use of the Reactive Extensions (Rx) framework. This has been around for some time now, but perhaps hasn’t gained the attention it deserves. Rx essentially provides a much more powerful way of working with events.

In this post I’ll show how you can take .NET code that both raises and consumes regular .NET events and convert it to using Rx. And I plan to follow up with a future post that looks at the benefits of making this transition.

Raising .NET Events

First, let’s look at how you raise events the standard .NET way. This simple demo class has a Start method that kicks off a background task. It raises two types of event – Progress events, and a Finished event when it’s done which can contain an exception if something went wrong.

class EventDemo
{
    public event EventHandler<ProgressEventArgs> Progress;
    public event EventHandler<FinishedEventArgs> Finished;

    protected virtual void OnFinished(FinishedEventArgs e)
    {
        var handler = Finished;
        if (handler != null) handler(this, e);
    }

    protected virtual void OnProgress(ProgressEventArgs e)
    {
        var handler = Progress;
        if (handler != null) handler(this, e);
    }

    public void Start()
    {
        Task.Run((Action)DoStuff);
    }

    private void DoStuff()
    {
        try
        {
            foreach (var n in Enumerable.Range(1, 10))
            {
                Thread.Sleep(1000);
                OnProgress(new ProgressEventArgs(n));
            }
            OnFinished(new FinishedEventArgs());
        }
        catch (Exception exception)
        {
            OnFinished(new FinishedEventArgs(exception));
        }
    }
}

You can see here that we’ve created a couple of helper functions (OnFinished and OnProgress) to avoid the race condition issue. We also needed to create a couple of event argument classes:

class ProgressEventArgs : EventArgs
{
    public ProgressEventArgs(int progress)
    {
        Progress = progress;
    }

    public int Progress { get; private set; }
}

class FinishedEventArgs : EventArgs
{
    public FinishedEventArgs(Exception exception = null)
    {
        Exception = exception;
    }

    public Exception Exception { get; private set; }
}

Subscribing to .NET events

To subscribe to the events raised by this class, we simply use the += operator and give it either the name of the event handler method or a lambda function to run. Here we use the first technique, since it makes unsubscribing easier if we need to do that later.

class EventConsumer
{
    private readonly EventDemo eventDemo;
    public EventConsumer()
    {
        eventDemo = new EventDemo();
        eventDemo.Progress += OnProgress;
        eventDemo.Finished += EventDemoOnFinished;
        eventDemo.Start();
    }

    private void EventDemoOnFinished(object sender, FinishedEventArgs finishedEventArgs)
    {
        Console.WriteLine("Finished {0}", finishedEventArgs.Exception == null ? "success" : finishedEventArgs.Exception.Message);
    }

    private void OnProgress(object sender, ProgressEventArgs progressEventArgs)
    {
        Console.WriteLine("Progress {0}", progressEventArgs.Progress);
    }
}

Raising Rx Events

Now let’s have a look at how we might implement the same thing, but using Reactive Extensions instead. We might be tempted to think that the Rx version of our class should mirror as closely as possible the .NET version, so we’d add two IObservable properties, which were the equivalent of the Progress and Finished events. The easy way to create IObservables is to make instances of the Rx Subject class , and the “events” can be fired by calling the OnNext method on the subject. The event argument can be any object – there is no need to make it inherit from EventArgs.

class ObservableDemoNaive
    {
        private readonly Subject<Progress> progressSubject;
        private readonly Subject<Finished> finishedSubject;

        public ObservableDemoNaive()
        {
            progressSubject = new Subject<Progress>();
            finishedSubject = new Subject<Finished>();
        }

        public IObservable<Progress> Progress { get { return progressSubject; } }

        public IObservable<Finished> Finished { get { return finishedSubject; } }

        public void Start()
        {
            Task.Run(() => DoStuff());
        }

        private void DoStuff()
        {
            try
            {
                foreach (var n in Enumerable.Range(1, 10))
                {
                    Thread.Sleep(1000);
                    progressSubject.OnNext(new Progress(n));
                }
                finishedSubject.OnNext(new Finished());
            }
            catch (Exception exception)
            {
                finishedSubject.OnNext(new Finished() { Exception = exception});
            }
        }
    }

Whilst this works,  it isn’t a particularly good example of using Reactive Extensions. For starters, the Finished event is trying to solve a problem that Rx already solves for us. An observable can tell us when it is completed, as well as when it has errored. So we can simplify this to work with a single Observable:

class ObservableDemoBetter
{
    private readonly Subject<Progress> progressSubject;

    public ObservableDemoBetter
    {
        progressSubject = new Subject<Progress>();
    }

    public IObservable<Progress> Progress { get { return progressSubject; } }

    public void Start()
    {
        Task.Run(() => DoStuff());
    }

    private void DoStuff()
    {
        try
        {
            foreach (var n in Enumerable.Range(1, 10))
            {
                Thread.Sleep(1000);
                progressSubject.OnNext(new Progress(n));
            }
            progressSubject.OnCompleted();
        }
        catch (Exception exception)
        {
            progressSubject.OnError(exception);
        }
    }
}

Note that the OnCompleted method of the Subject class doesn’t take any parameters. So if your original FinishedEventArgs contained additional data, a different way of passing it back would be needed.

However, in making this change, we’ve actually modified the behaviour. An observable sequence can only complete once, so if you were to call Start twice, you’d not see any results after the first sequence finished. In fact, this probably highlights a design flaw in our original class – if two operations are going on simultaneously, you have no idea which one the Progress and Finished events relate to.

This can be solved by making our Start method return the IObservable stream of progress events for the particular operation that was started:

class ObservableDemo
{
    public IObservable<Progress> Start()
    {
        var subject = new Subject<Progress>();
        Task.Run(() => DoStuff(subject));
        return subject;
    }

    private void DoStuff(Subject<Progress> subject)
    {
        try
        {
            foreach (var n in Enumerable.Range(1, 10))
            {
                Thread.Sleep(1000);
                subject.OnNext(new Progress(n));
            }
            subject.OnCompleted();
        }
        catch (Exception exception)
        {
            subject.OnError(exception);
        }
        finally
        {
            subject.Dispose();
        }
    }
}

Now we have a much cleaner, simpler interface, and there can be no confusion about which background task is raising the events if we call Start twice.

Subscribing to Rx Events

Subscribing to Rx events is nice and simple. You can provide an action that is called for each event, and optionally for when it completes successfully or with an error as shown here:

var observable = eventDemo.Start();
observable.Subscribe(
    p => Console.WriteLine("Progress {0}", p.Value),
    e => Console.WriteLine("Error {0}", e.Message),
    () => Console.WriteLine("Finished success"));

As can be seen, the Rx versions of both the event producer and consumer code are slightly more succinct than the equivalent regular .NET event code.

The Story So Far

So far we’ve seen that it is fairly straightforward to switch from .NET events to Rx Observables, but you may want to consider changing how you work with them to fit more nicely with the way Rx works. In the next post we’ll look at other benefits that Observables bring to the table such as better unsubscribing, and the ability to manipulate and filter the events with a LINQ-style syntax, as well as seeing how to turn existing .NET events into Observables.

If you want to play with Reactive Extensions yourself, the easy way is to simply add the NuGet package to your project.

No comments: