Phil Trelford's Array
POKE 36879, 255

Implementing IObservable and extending Observable

April 14, 2010 01:13 by Phil

Following on from my post yesterday on implementing loosely coupled events; the following somewhat longer F# sample provides a simple System.IObservable<T> implementation for event consumers. This means that the loosely coupled events can now be easily consumed like vanilla .Net events using the sweeter syntactic sugar of either the F# Observable module and/or the Reactive Extensions (Rx).

Also included are some helper functions for IObservable<T>. The first is tap, based on Ruby’s function of the same name, that lets you tap into an expression sequence for debugging, logging, etc. The next is invoke which is used to implement both delay and onDispatcher, and could be used to provide functions for running on a background thread, etc. May your events be loose and happening.

Happening event broking interfaces:

/// Encapsulates a triggerable instance
type ITriggerable<'a> =
    abstract member Trigger : 'a -> unit
    
/// Encapsulates an event that is both triggerable and observable    
type IHappen<'a> = 
    inherit ITriggerable<'a> 
    inherit System.IObservable<'a>
            
/// Encapsulates repository of IHappen instances
type IHappenings =
    abstract member ObtainHappening<'a> : unit -> IHappen<'a>

 

The Happening implementation:

/// Repository of IHappen instances      
type Happenings () =
    let happenings = System.Collections.Generic.Dictionary<System.Type,_>()
    let happeningsLock = obj()
    interface IHappenings with
        member this.ObtainHappening<'a> () =            
            let CreateHappening () = 
                let observers = ref []
                let add observer = 
                    observers := 
                        observer::!observers
                let remove observer = 
                    observers := 
                       !observers |> List.filter(fun x -> observer.Equals(x))
                let trigger x =
                    !observers 
                    |> List.iter (fun (observer:System.IObserver<'a>) -> 
                        observer.OnNext(x))
                let happenLock = obj()                        
                { new IHappen<'a> with                        
                    member this.Subscribe (observer:System.IObserver<'a>) =                                                                                    
                        lock happenLock (fun _ -> add observer)
                        { new System.IDisposable with
                            member this.Dispose() = 
                                lock happenLock (fun _ -> remove observer)                                
                        }                               
                    member this.Trigger (x:'a) = trigger x                       
                }               
            lock happeningsLock (fun _ ->    
                match happenings.TryGetValue(typeof<'a>) with
                | true, happen -> unbox happen
                | false, _ ->                 
                    let happen = CreateHappening ()
                    happenings.Add(typeof<'a>,box happen)
                    happen
            )                    
    end

 

Observable helper methods (tap, invoke, delay, onDispatcher):

/// Functions operating on IObservable<T>
module Observable = 
    open System
    open System.Windows.Threading
        
    /// Observer helper lifted from lib\FSharp.Core\control.fs
    [<AbstractClass>]  
    type BasicObserver<'a>() =
        let mutable stopped = false
        abstract Next : value : 'a -> unit
        abstract Error : error : exn -> unit
        abstract Completed : unit -> unit
        interface IObserver<'a> with
            member x.OnNext value = 
                if not stopped then x.Next value
            member x.OnError e = 
                if not stopped then stopped <- true
                x.Error e
            member x.OnCompleted () = 
                if not stopped then stopped <- true
                x.Completed ()
    
    /// Tap into a sequence of Observable expressions
    let tap f (w:IObservable<_>) =
        let hook (observer:IObserver<_>) =
            { new BasicObserver<_>() with  
                member x.Next(v) = 
                    match (try f v; None with | exn -> Some(exn)) with
                    | Some(exn) -> observer.OnError exn
                    | None -> observer.OnNext v                    
                member x.Error(e) = 
                    observer.OnError(e)
                member x.Completed() = 
                   observer.OnCompleted() 
            } 
        { new IObservable<_> with 
            member x.Subscribe(observer) =
                w.Subscribe (hook(observer))                    
        }
   
    /// Invoke Observer function through specified function
    let invoke f (w:IObservable<_>) =
        let hook (observer:IObserver<_>) =
            { new BasicObserver<_>() with  
                member x.Next(v) = 
                    f (fun () -> observer.OnNext v)
                member x.Error(e) = 
                    f (fun () -> observer.OnError(e))
                member x.Completed() = 
                    f (fun () -> observer.OnCompleted()) 
            } 
        { new IObservable<_> with 
            member x.Subscribe(observer) =
                w.Subscribe (hook(observer))
        }
 
    /// Delay execution of Observer function
    let delay milliseconds (observable:IObservable<'a>) =
        let f g =
            async {
                do! Async.Sleep(milliseconds)
                do g ()
            } |> Async.Start
        invoke f observable 
     
    /// Execture Observer function on Dispatcher thread
    /// <Remarks>For WPF and Silverlight</remarks> 
    let onDispatcher (observable:IObservable<'a>) =
        let dispatcher = Dispatcher.CurrentDispatcher
        let f g =
            dispatcher.BeginInvoke(Action(fun _ -> g())) |> ignore
        invoke f observable

 

To wrap up, Matthew Podwysocki’s Time Flies Like An Arrow in F# and the Reactive Extensions for .NET redux (Note: also worth checking Brian McNamara’s version):

module Test =  
    open System
    open System.Windows
    open System.Windows.Controls
    open System.Windows.Input
    open System.Windows.Media
    open System.Windows.Threading           
        
    let getPosition (element : #UIElement) (args : MouseEventArgs) =
        let point = args.GetPosition(element)
        (point.X, point.Y)    
     
    type TimeFliesWindow(happenings:IHappenings) as this =
        inherit Window()        

        let canvas=Canvas(Width=800.0,Height=400.0,Background=Brushes.White) 
        do this.Content <- canvas       

        do "F# can react to second class events!"
            |> Seq.iteri(fun i c ->  
                let s = TextBlock(Width=20.0, 
                                Height=30.0, 
                                FontSize=20.0, 
                                Text=string c, 
                                Foreground=Brushes.Black, 
                                Background=Brushes.White)
                canvas.Children.Add(s) |> ignore              
                // Consume mouse movement happenings                 
                happenings.ObtainHappening<MouseEventArgs>()                                 
                |> Observable.map (getPosition canvas)       
                |> Observable.tap (fun p -> Console.WriteLine p)
                |> Observable.delay (i * 100)
                |> Observable.onDispatcher
                |> Observable.subscribe (fun (x, y) ->                                                        
                     Canvas.SetTop(s, y) 
                     Canvas.SetLeft(s, x + float ( i * 10)))              
                |> ignore
           )        
              
    let happenings = new Happenings() :> IHappenings
    let win = TimeFliesWindow(happenings,Title="Time files like an arrow")   
    
    do  // Publish mouse movement event happenings
        let happen = happenings.ObtainHappening<MouseEventArgs>()
        win.MouseMove         
        |> Observable.subscribe happen.Trigger |> ignore

    [<STAThread>]
    do (new Application()).Run(win) |> ignore

Happening.fs (7.12 kb)


Tags:
Categories: .Net | F#
Actions: E-mail | Permalink | Comments (7) | Comment RSSRSS comment feed

Comments

April 14. 2010 01:44

pingback

Pingback from topsy.com

Twitter Trackbacks for
        
        Implementing IObservable and extending Observable
        [trelford.com]
        on Topsy.com

topsy.com

April 14. 2010 05:56

pingback

Pingback from alvinashcraft.com

Dew Drop – April 14, 2010 | Alvin Ashcraft's Morning Dew

alvinashcraft.com

April 15. 2010 02:43

trackback

Daily tech links for .net and related technologies - Apr 15-18, 2010

Daily tech links for .net and related technologies - Apr 15-18, 2010 Web Development Guarding against

Sanjeev Agarwal

April 16. 2010 11:16

trackback

F# Discoveries This Week 04/16/2010

It’s finally here.  After years of work, F# 2.0 is out and ready to change the way you program in

Rick Minerich's Development Wonderland

May 5. 2010 04:44

pingback

Pingback from shinguyen.net

Daily tech links for .net and related technologies – Apr 15-18, 2010 | OOP - Object Oriented Programing

shinguyen.net

May 20. 2010 10:30

pingback

Pingback from 136.defutbolazo.com

1971 Ford F 150 F100, 1gb Pc2100 Ddr

136.defutbolazo.com

December 2. 2010 00:04

trackback

Fog Index

Fog Index

Phillip Trelford's blog... evidently

Add comment


(Will show your Gravatar icon)

  Country flag

biuquote
  • Comment
  • Preview
Loading