Phillip Trelford's Array

POKE 36879,255

Implementing IObservable and extending Observable

Following on from my post yesterday on implementing loosely coupled events; the following somewhat longer F# sample provides a simple System.IObservable<T> implementation for event consumers. This means that the loosely coupled events can now be easily consumed like vanilla .Net events using the sweeter syntactic sugar of either the F# Observable module and/or the Reactive Extensions (Rx).

Also included are some helper functions for IObservable<T>. The first is tap, based on Ruby’s function of the same name, that lets you tap into an expression sequence for debugging, logging, etc. The next is invoke which is used to implement both delay and onDispatcher, and could be used to provide functions for running on a background thread, etc. May your events be loose and happening.

Happening event broking interfaces:

/// Encapsulates a triggerable instance
type ITriggerable<'a> =
    abstract member Trigger : 'a -> unit
    
/// Encapsulates an event that is both triggerable and observable    
type IHappen<'a> = 
    inherit ITriggerable<'a> 
    inherit System.IObservable<'a>
            
/// Encapsulates repository of IHappen instances
type IHappenings =
    abstract member ObtainHappening<'a> : unit -> IHappen<'a>

 

The Happening implementation:

/// Repository of IHappen instances      
type Happenings () =
    let happenings = System.Collections.Generic.Dictionary<System.Type,_>()
    let happeningsLock = obj()
    interface IHappenings with
        member this.ObtainHappening<'a> () =            
            let CreateHappening () = 
                let observers = ref []
                let add observer = 
                    observers := 
                        observer::!observers
                let remove observer = 
                    observers := 
                       !observers |> List.filter(fun x -> observer.Equals(x))
                let trigger x =
                    !observers 
                    |> List.iter (fun (observer:System.IObserver<'a>) -> 
                        observer.OnNext(x))
                let happenLock = obj()                        
                { new IHappen<'a> with                        
                    member this.Subscribe (observer:System.IObserver<'a>) =                                                                                    
                        lock happenLock (fun _ -> add observer)
                        { new System.IDisposable with
                            member this.Dispose() = 
                                lock happenLock (fun _ -> remove observer)                                
                        }                               
                    member this.Trigger (x:'a) = trigger x                       
                }               
            lock happeningsLock (fun _ ->    
                match happenings.TryGetValue(typeof<'a>) with
                | true, happen -> unbox happen
                | false, _ ->                 
                    let happen = CreateHappening ()
                    happenings.Add(typeof<'a>,box happen)
                    happen
            )                    
    end

 

Observable helper methods (tap, invoke, delay, onDispatcher):

/// Functions operating on IObservable<T>
module Observable = 
    open System
    open System.Windows.Threading
        
    /// Observer helper lifted from lib\FSharp.Core\control.fs
    [<AbstractClass>]  
    type BasicObserver<'a>() =
        let mutable stopped = false
        abstract Next : value : 'a -> unit
        abstract Error : error : exn -> unit
        abstract Completed : unit -> unit
        interface IObserver<'a> with
            member x.OnNext value = 
                if not stopped then x.Next value
            member x.OnError e = 
                if not stopped then stopped <- true
                x.Error e
            member x.OnCompleted () = 
                if not stopped then stopped <- true
                x.Completed ()
    
    /// Tap into a sequence of Observable expressions
    let tap f (w:IObservable<_>) =
        let hook (observer:IObserver<_>) =
            { new BasicObserver<_>() with  
                member x.Next(v) = 
                    match (try f v; None with | exn -> Some(exn)) with
                    | Some(exn) -> observer.OnError exn
                    | None -> observer.OnNext v                    
                member x.Error(e) = 
                    observer.OnError(e)
                member x.Completed() = 
                   observer.OnCompleted() 
            } 
        { new IObservable<_> with 
            member x.Subscribe(observer) =
                w.Subscribe (hook(observer))                    
        }
   
    /// Invoke Observer function through specified function
    let invoke f (w:IObservable<_>) =
        let hook (observer:IObserver<_>) =
            { new BasicObserver<_>() with  
                member x.Next(v) = 
                    f (fun () -> observer.OnNext v)
                member x.Error(e) = 
                    f (fun () -> observer.OnError(e))
                member x.Completed() = 
                    f (fun () -> observer.OnCompleted()) 
            } 
        { new IObservable<_> with 
            member x.Subscribe(observer) =
                w.Subscribe (hook(observer))
        }
 
    /// Delay execution of Observer function
    let delay milliseconds (observable:IObservable<'a>) =
        let f g =
            async {
                do! Async.Sleep(milliseconds)
                do g ()
            } |> Async.Start
        invoke f observable 
     
    /// Execture Observer function on Dispatcher thread
    /// <Remarks>For WPF and Silverlight</remarks> 
    let onDispatcher (observable:IObservable<'a>) =
        let dispatcher = Dispatcher.CurrentDispatcher
        let f g =
            dispatcher.BeginInvoke(Action(fun _ -> g())) |> ignore
        invoke f observable

 

To wrap up, Matthew Podwysocki’s Time Flies Like An Arrow in F# and the Reactive Extensions for .NET redux (Note: also worth checking Brian McNamara’s version):

module Test =  
    open System
    open System.Windows
    open System.Windows.Controls
    open System.Windows.Input
    open System.Windows.Media
    open System.Windows.Threading           
        
    let getPosition (element : #UIElement) (args : MouseEventArgs) =
        let point = args.GetPosition(element)
        (point.X, point.Y)    
     
    type TimeFliesWindow(happenings:IHappenings) as this =
        inherit Window()        

        let canvas=Canvas(Width=800.0,Height=400.0,Background=Brushes.White) 
        do this.Content <- canvas       

        do "F# can react to second class events!"
            |> Seq.iteri(fun i c ->  
                let s = TextBlock(Width=20.0, 
                                Height=30.0, 
                                FontSize=20.0, 
                                Text=string c, 
                                Foreground=Brushes.Black, 
                                Background=Brushes.White)
                canvas.Children.Add(s) |> ignore              
                // Consume mouse movement happenings                 
                happenings.ObtainHappening<MouseEventArgs>()                                 
                |> Observable.map (getPosition canvas)       
                |> Observable.tap (fun p -> Console.WriteLine p)
                |> Observable.delay (i * 100)
                |> Observable.onDispatcher
                |> Observable.subscribe (fun (x, y) ->                                                        
                     Canvas.SetTop(s, y) 
                     Canvas.SetLeft(s, x + float ( i * 10)))              
                |> ignore
           )        
              
    let happenings = new Happenings() :> IHappenings
    let win = TimeFliesWindow(happenings,Title="Time files like an arrow")   
    
    do  // Publish mouse movement event happenings
        let happen = happenings.ObtainHappening<MouseEventArgs>()
        win.MouseMove         
        |> Observable.subscribe happen.Trigger |> ignore

    [<STAThread>]
    do (new Application()).Run(win) |> ignore

Happening.fs (7.12 kb)

Pingbacks and trackbacks (7)+

Comments are closed