Phillip Trelford's Array

POKE 36879,255

Mini Rx: Observable Extensions

The Reactive Extensions (Rx) provide LINQ style querying capabilities for events in C#, VB.Net and JavaScript. Rx implements extension methods over IObservable<T>, just as LINQ to Objects provides a set of extension methods over IEnumerable<T>,


There has been a fair amount of Microsoft publicity on the Rx library in blogs, videos and talks. Demand for Rx skills in IT jobs has grown over the last 12 months. 

Reactive Extensions Demand Trend

That said, the current implementation of the Rx library has at least a couple of issues:

Given that LINQ is based partly on higher-order functions from functional programming perhaps it’s not surprising F# supported querying over events back in 2006. It’s also relatively trivial to expose this functionality to C# by defining compatible extension methods using the ExtensionAttribute e.g.

type ObservableExtensions private () =
   static member Select<'T,'U>(source:IObservable<'T>,selector:Func<'T,'U>) =
       source |> selector.Invoke
   static member Where<'T>(source:IObservable<'T>,predicate:Func<'T,bool>) =
       source |> Observable.filter predicate.Invoke
   static member Subscribe<'T>(source:IObservable<'T>, action:Action<'T>) =
       source |> Observable.subscribe action.Invoke

This is already enough to provide basic LINQ syntax in C# for types implementing IObservable<T>:

var leftPressedMove =
    from e in mouseMove
    where e.LeftButton == MouseButtonState.Pressed
    select e;

F# custom events implement IObservable<’T> by default and F# provides modules with higher-order functions for both .Net Events and the IObservable<’T> interface.

For C# a mechanism is needed to convert .Net events to an object that implements IObservable<T>. This can be achieved fairly concisely in F# using object expressions:

let FromEvent<'TEventArgs, 'TDelegate when 'TEventArgs:> EventArgs>
            removeHandler:Action<'TDelegate>)  =
    { new IObservable<'TEventArgs> with
        member this.Subscribe(observer:IObserver<_>) =
            let handler = Action<_>(observer.OnNext) |> conversion.Invoke
            addHandler.Invoke handler
            let remove () = removeHandler.Invoke handler
            { new IDisposable with member this.Dispose() = remove () }

Although converting from the event in C# feels a little convoluted:

var mouseMove =
    Observable.FromEvent<MouseEventArgs, MouseEventHandler>(
        f => new MouseEventHandler((sender, args) => f(args)),
        handler => MouseMove += handler,
        handler => MouseMove -= handler);

Again for C# a mechanism is required for directly creating objects that can be both a source of events and be used to observe events. Rx follows the Observer pattern and provides a type called Subject that implements both IObserver<T> and IObservable<T>.

Earlier in the year I put up 2 simple types on F# Snippets, that are functionally equivalent to Rx’s Subject<T> and ReplaySubject<T>:

The ReplaySubject implementation uses F# Agents to simplify concurrency..

The types can be used easily from C#:

var s = new Subject<int>();

For Silverlight and WPF we need a mechanism for invoking methods on the UI thread, which I implemented in F# back in 2010: Implementing IObservable and extending Observable 

    .Select(e => e.GetPosition(canvas))
    .Delay(closure * 100)
    .Subscribe(pos =>
        Canvas.SetLeft(label, pos.X + closure * 10);
        Canvas.SetTop(label, pos.Y);

Putting it altogether, the inevitable "time flies like an arrow" Silverlight demo:

In case you’d like to have a play yourself, I’ve put up a preview release on CodePlex:

Comments (1) -

  • Gustavo Guerra

    10/31/2011 4:27:00 PM |

    Cool. That would be nice to include in FSharpx

Pingbacks and trackbacks (2)+

Comments are closed