Sergey Shishkin

on agile software development

Reactive Extensions @DNUGK

Update: last time I forgot to put the usage example of the ExpectedSequence extension method. Fixed now. See the end of the post.

Previously introduced in Siverlight Toolkit, Microsoft Reactive Extensions (aka Rx) is making its way to be shipped within .NET Framework 4 now. Albert Weinert gave me an opportunity to demonstrate some of my geeky experiments with Rx on the DNUGK meeting yesterday.

The following API comparison between the well-known .NET iterator pattern and the new Rx observer summarizes the intent of Rx: provide API to work with events similar to that of collections, while preserving reactive nature (push-model) of events.

That means that we can do the same set-based operations with events as we already do with collections, e.g. Select, Concat, Join, Where, Merge etc. I must admit at this point that by events I don’t mean .NET events but rather more general occurrences of asynchronous nature, speaking Rx – anything that implements IObservable can be treated as an event and even participate in Linq queries.

To following example will make things more concrete:

public class KeySequencer

{

    private readonly Subject<char> keys = new Subject<char>();

    private bool shouldStop;

 

    public IObservable<char> Keys

    {

        get { return keys; }

    }

 

    public void Start()

    {

        while (!shouldStop)

        {

            var key = Console.ReadKey(false).KeyChar;

            keys.OnNext(key);

        }

 

        Console.WriteLine("exit");

    }

 

    public void Stop()

    {

        shouldStop = true;

    }

}

The KeySequencer class publishes all the keys that user presses as a sequence of IObservable events via its Keys property. Creating an IObservable is in fact very simple – I use the Subject class from Rx that is an “observable observer”. As an observer it has an OnNext method where I can push keys into, and as an observable it forwards them further to everyone who has subscribed down the line. Note that key events are of type Char. As I said, everything can be an event!

Well, the more interesting part is what we can do with those events published by the KeySequencer. First of all we can subscribe to them:

var keySequencer = new KeySequencer();

 

keySequencer.Keys.Subscribe(Console.WriteLine);

 

keySequencer.Start();

Now every character will be put out to the console twice – first when it’s pressed and after that when the subscribed Console.WriteLine method has been called. Still not impressive? What about that?:

Func<char, IObservable<char>> pressed =

    x => keySequencer.Keys.Where(key => key == x);

Func<char, IObservable<char>> notPressed =

    x => keySequencer.Keys.Where(key => key != x);

Func<char, IObservable<char>> followedBy = x

    => pressed(x).Take(1).Until(notPressed(x));

 

var exit =

    from e in pressed(‘e’)

    from x in followedBy(‘x’)

    from i in followedBy(‘i’)

    from t in followedBy(‘t’)

    select new Unit();

 

exit.Subscribe(x => keySequencer.Stop());

Here we define four further event sequences (pressed(‘e’), followedBy(‘x), followedBy(‘i’) and followedBy(‘t’)) and combine them into another event sequence (exit) which happens when the user types “exit”, so we can stop listening to keys being pressed and quit the program.

 

A further logical step could be to generalize the “expected sequence” pattern, of course with Linq in a nicely functional style:

public static class SequenceExtensions

{

    public static IObservable<T> SelectEqualTo<T>(
        this IObservable<T> source,
        T expected)

    {

        return source.Where(x => Equals(expected, x));

    }

 

    public static IObservable<T> SelectNotEqualTo<T>(
        this IObservable<T> source,
        T expected)

    {

        return source.Where(x => !Equals(expected, x));

    }

 

    public static IObservable<T> FollowedBy<T>(
        this IObservable<T> source,
        T expected)

    {

        return SelectEqualTo(source, expected)
            .Take(1).Until(SelectNotEqualTo(source, expected));

    }

 

    public static IObservable<IEnumerable<T>> ExpectedSequence<T>(

        this IObservable<T> source,

        IEnumerable<T> expectedSequence)

    {

        return expectedSequence

            .Select((item, index) => index == 0 ?

                SelectEqualTo(source, item) :

                FollowedBy(source, item))

            .Aggregate(

                Observable.Return(default(T)),

                (accumulator, item) => accumulator.SelectMany(_ => item))

            .Select(_ => expectedSequence);

    }

}

Now I can turn the “exit” sequence into this:

var exit = keySequencer.Keys.ExpectedSequence("exit");

exit.Subscribe(x => keySequencer.Stop());

And of course it works same way for any sequences. Just put a string as a parameter for character a sequence (since String implements IEnumerable<char>) or any other IEnumerable<T> for an IObservable<T> event source of your choice.

So, that were just my first steps in a new world of reactive programming with Rx. And I really liked it!

Advertisements

Written by Sergey Shishkin

11.12.2009 at 15:48

Posted in Presentations

%d bloggers like this: