Reactive Extensions (Rx) is now Open Source

Today, Microsoft Open Technologies, Inc., is open sourcing Rx. Its source code is now hosted on CodePlex

  • Reactive Extensions:
    • Rx.NET: The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.
    • RxJS: The Reactive Extensions for JavaScript (RxJS) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in JavaScript which can target both the browser and Node.js.
    • Rx++: The Reactive Extensions for Native (RxC) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
  • Interactive Extensions
    • Ix: The Interactive Extensions (Ix) is a .NET library which extends LINQ to Objects to provide many of the operators available in Rx but targeted for IEnumerable<T>.
    • IxJS: An implementation of LINQ to Objects and the Interactive Extensions (Ix) in JavaScript.
    • Ix++: An implantation of LINQ for Native Developers in C++

From Scott Hanselman:

“GitHub for Windows uses the Reactive Extensions for almost everything it does, including network requests, UI events, managing child processes (git.exe). Using Rx and ReactiveUI, we’ve written a fast, nearly 100% asynchronous, responsive application, while still having 100% deterministic, reliable unit tests. The desktop developers at GitHub loved Rx so much, that the Mac team created their own version of Rx and ReactiveUI, called ReactiveCocoa, and are now using it on the Mac to obtain similar benefits.” – Paul Betts, GitHub

Getting Started with Reactive Extensions (part 4)

This is the forth and final post in the series about RX

Demo – Client Server Async events

Consider the following situation. I have a WCF server and a Windows (or web client). The client can dispatch service request to the server and the server returns an async response. Now, when the user dispatches many requests at the same time for the same data, I only want to get one response from the server. As you can see in the image below, I only get one answer for all my calls. 

image

Let’s look at the client code (Ctor of main windows in this case)

 public MainWindow()
{
    InitializeComponent();
    DataContext = _vm = new MyViewModel();

    //create instance of my WCF service
    _service = new MyService.MyServiceClient("BasicHttpBinding_IMyService");

    //register with weak reference to the button click event (instead of using myButton.Click += MyRoutedEventHandler(sender, eventArgs)           
    var buttonClick = Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(h => myButton.Click += h, h => myButton.Click -= h);
    buttonClick.Subscribe(s => 
    { 
        _vm.ClientResponse.Add(_counter.ToString()); 
    });

    //Observer my async service method (AddOne)
    var myAsyncServiceCall = Observable.FromAsyncPattern<int /*service input*/, int /*service output*/>(_service.BeginAddOne, _service.EndAddOne);

    //subscribe
    buttonClick
        .Select(s => _counter) //we dont care for s which is of type RoutedEventArgs, we want to "work" on the _counter instead
        .Throttle(TimeSpan.FromSeconds(1)) //wait for 1 secods to see that nothing happend
        .SelectMany( t=> myAsyncServiceCall(t).TakeUntil(buttonClick)) //select the result from the observer we declared above
        .DistinctUntilChanged() //distinct
        .ObserveOnDispatcher() //work on UI thread
        .Subscribe(s => //what to do with the result
        {
            _counter = s;
            _vm.ServerResponse.Add(s.ToString());
        });
}

As you can see I register to the button click event using reactive. This ensures I have a week reference observing an event, which can come quite handy when working with view models where strong references can cause a memory leak at some points. I didn’t have to do it, I could just “+=” the Button click event, but again, this is just to demo RX so hence the event subscription.

Next, I observe from async pattern (my WCF service begin and end methods). Then take the button click observer, change the context to my _counter (using the Linq Select on the second line). Throttle the event for 1 second to see that the user has stopped clicking the button. Then I use SelectMany to get a response from the server async event until (TakeUntil) the button is clicked again; In this case, I discard the response and keep selecting till a new response arrives to my final button click event.

The rest is pretty self-explanatory (you can download the code at the end of this post).

Well, that’s it. This concludes 4 posts on getting started with RX. Personally, I found RX very helpful in many cases. It’s not the answer to everything, but can sure be helpful in today’s async world.

There are many emerging frameworks for instance: RX UI, and MVVM I strongly encourage you to evaluate whether you need RX here and there or want to have a complete solution with RX. In many cases you would probably only need a tweak here and there…

Code for this demo

Getting Started with Reactive Extensions (part 3)

This is the third post in the series about RX

Demo – complex collection events

Consider the following window; I have a collection of filters. As a user I can add or remove a filter and set each filter value. While the user is editing I don’t want to get notified on filter change event, I only want to be notified when the user has stopped editing. Also, when a filter is removed, I want its subscription to be removed as well

image

Again, doing such a thing without RX would be implementing many timers with a state machine. Here is the RX way

var throttle = 3; //seconds

        var filtersChanged = Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                               eh => Filters.CollectionChanged += eh,
                               eh => Filters.CollectionChanged -= eh);

        var filtersRemoved =
            from change in filtersChanged
            where change.EventArgs.Action == NotifyCollectionChangedAction.Remove
            from filter in change.EventArgs.OldItems.Cast<SingleFilter>()
            select filter;

        var filtersAdded =
            from change in filtersChanged
            where change.EventArgs.Action == NotifyCollectionChangedAction.Add
            from filter in change.EventArgs.NewItems.Cast<SingleFilter>()
            select filter;

        var filterPropertyChanges =
            from filter in filtersAdded
            from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged")
                .TakeUntil(filtersRemoved.Where(removed => removed == filter))
            select System.Reactive.Unit.Default;

        _rxFilters =
            new[]
            {
                filtersAdded.Select(_ => System.Reactive.Unit.Default), 
                filtersRemoved.Select(_ => System.Reactive.Unit.Default), 
                filterPropertyChanges,
            }
            .Merge()
            .Throttle(TimeSpan.FromSeconds(throttle))
            .ObserveOnDispatcher() //System.Reactive.Windows.Threading asm
                .Subscribe(ApplyFilter);

First I subscribe to the filter change event (filterChanged), and then I subscribe to the filter remove (filtersRemoved) and filter added (filtersAdded). This is easily done because I’m using an observable collection with supplies a CollectionChanged event.

Then I register to the PropertyChanged event on my object (called SingleFilter). This is a plain object with some properties. Notice the TakeUntil – which basically says: “take until the filter is removed from my filtersRemoved subscription).

The last step, takes all my observers and convert their selection to a default value. This is because each of my observers is of different type, and if I want all of them to subscribe to a single point of subscription I have to set them properly to the same selection. Then, I merge all the events (of the same type now), and again, Throttle till the user stops changing the filters, Observer on the UI thread and subscribe to an action.

Well, that’s it. Here is the complete code for reference or you can download the solution at the end of this post.

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ComponentModel; using System.Collections.ObjectModel; using Microsoft.Practices.Prism.Commands; using System.Linq; using System.Collections.Specialized; using System.Reactive.Linq; using System.Reactive.Concurrency; using System.Linq.Expressions; using System.Reflection; using System.Reactive.Subjects; namespace RxDemoWpf { public class MainWindowViewModel : INotifyPropertyChanged { #region Members private IDisposable _rxFilters; #endregion #region Properties public ObservableCollection<SingleFilter> Filters { get; set; } public ObservableCollection<string> Events { get; set; } public DelegateCommand AddFilterCommand { get; set; } public DelegateCommand<int?> DeleteFilterCommand { get; set; } public DelegateCommand ClearEventsCommand { get; set; } #endregion #region Ctor public MainWindowViewModel() { Filters = new ObservableCollection<SingleFilter>(); Events = new ObservableCollection<string>(); AddFilterCommand = new DelegateCommand( () => { Filters.Add(new SingleFilter()); }); DeleteFilterCommand = new DelegateCommand<int?>( (ID) => { Filters.Remove( Filters.Where(f => f.ID == ID).First()); }); ClearEventsCommand = new DelegateCommand(() => Events.Clear()); InitReactiveExtension1(); } #endregion #region Methods private void InitReactiveExtension1() { var throttle = 3; //seconds var filtersChanged = Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>( eh => Filters.CollectionChanged += eh, eh => Filters.CollectionChanged -= eh); var filtersRemoved = from change in filtersChanged where change.EventArgs.Action == NotifyCollectionChangedAction.Remove from filter in change.EventArgs.OldItems.Cast<SingleFilter>() select filter; var filtersAdded = from change in filtersChanged where change.EventArgs.Action == NotifyCollectionChangedAction.Add from filter in change.EventArgs.NewItems.Cast<SingleFilter>() select filter; var filterPropertyChanges = from filter in filtersAdded from propertyChanged in Observable.FromEventPattern<PropertyChangedEventArgs>(filter, "PropertyChanged") .TakeUntil(filtersRemoved.Where(removed => removed == filter)) select System.Reactive.Unit.Default; _rxFilters = new[] { filtersAdded.Select(_ => System.Reactive.Unit.Default), filtersRemoved.Select(_ => System.Reactive.Unit.Default), filterPropertyChanges, } .Merge() .Throttle(TimeSpan.FromSeconds(throttle)) .ObserveOnDispatcher() //System.Reactive.Windows.Threading asm .Subscribe(ApplyFilter); } private void ApplyFilter(SingleFilter f) { ApplyFilter(System.Reactive.Unit.Default); } public void ApplyFilter(System.Reactive.Unit e) { var res = new StringBuilder(); foreach (var filter in Filters) { if (res.Length > 0) { res.Append(" And "); } res.Append("(" + filter.LeftSideText + " " + filter.SelectedOperator + " " + filter.RightSideText + ")"); } Events.Add(res.ToString()); } #endregion #region Property Changed public event PropertyChangedEventHandler PropertyChanged; public void NotifyPropertyChanged(string propName) { if (PropertyChanged != null) { PropertyChanged(this, new PropertyChangedEventArgs(propName)); } } #endregion } public class SingleFilter : INotifyPropertyChanged, IEqualityComparer<SingleFilter> { #region Properties public ObservableCollection<string> Operators { get; set; } private string _leftSideText; public string LeftSideText { get { return _leftSideText; } set { _leftSideText = value; NotifyPropertyChanged("LeftSideText"); } } private string _rightSideText; public string RightSideText { get { return _rightSideText; } set { _rightSideText = value; NotifyPropertyChanged("RightSideText"); } } private string _selectedOperator; public string SelectedOperator { get { return _selectedOperator; } set { _selectedOperator = value; NotifyPropertyChanged("SelectedOperator"); } } public int ID { get; set; } #endregion #region Ctor public SingleFilter() { Operators = new ObservableCollection<string>(new string[] { ">", "<", "=" }); ID = GetHashCode(); } #endregion #region Property Changed public event PropertyChangedEventHandler PropertyChanged; public void NotifyPropertyChanged(string propName) { if (PropertyChanged != null) { PropertyChanged(this, new PropertyChangedEventArgs(propName)); } } #endregion public bool Equals(SingleFilter x, SingleFilter y) { return x.LeftSideText == y.LeftSideText && x.RightSideText == y.RightSideText && x.SelectedOperator == y.SelectedOperator; } public int GetHashCode(SingleFilter obj) { return obj.GetHashCode(); } } }

 

 

Code for this demo here

Getting Started with Reactive Extensions (part 2)

This is the second post in the series about RX

Demo – simple mouse move

On my previous post I explained what RX is and how to get started with it. Now will go through some examples.

Consider the following window, where the mouse moves and you want to capture the mouse location. Notice I have no buttons to “catch” or “apply” the mouse position and I just want to track the current hovered item.

clip_image002

Without RX I had to open a timer and manage some sort of state machine to track the changes along the timer ticks. If I wanted to track the mouse only when the mouse stops moving I would have ended up with two timers or a much complex state machine. With RX I just have to set the following:

Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
    .Throttle(TimeSpan.FromSeconds(1))
    .ObserveOnDispatcher()
    .Subscribe(ShowColor);

What we did here is create an observer on the mouse move, and the Throttle (wait for) 1 second till the mouse stops moving, observer on the dispatcher (UI thread) and finally, subscribe using a method to show the result

if (VisualTreeHelper.HitTest(blueBorder, e.EventArgs.GetPosition(blueBorder)) != null)
{
    MouseColors.Add("Blue");
}
else if (VisualTreeHelper.HitTest(greenBorder, e.EventArgs.GetPosition(greenBorder)) != null)
.
.
.

Now, we can take this a step further and listen to a particular event for each of the borders instead of just HitTesting where the mouse is

//observer directly the control with merge

Observable.FromEventPattern<MouseEventArgs>(blueBorder, "MouseMove")
    .Merge(Observable.FromEventPattern<MouseEventArgs>(greenBorder, "MouseMove"))
    .Merge(Observable.FromEventPattern<MouseEventArgs>(redBorder, "MouseMove"))
    .Merge(Observable.FromEventPattern<MouseEventArgs>(yellowBorder, "MouseMove"))
    .Throttle(TimeSpan.FromSeconds(1))
    .ObserveOnDispatcher()
    .Subscribe(ShowColor2);

Notice that I merge different events into one subscription. We can take this even further by only propagating changes – so if the mouse moves over just one border will not raise a new subscription event (DistinctUntilChange)

 Observable.FromEventPattern<MouseEventArgs>(blueBorder, "MouseMove")
   .Merge(Observable.FromEventPattern<MouseEventArgs>(greenBorder, "MouseMove"))
   .Merge(Observable.FromEventPattern<MouseEventArgs>(redBorder, "MouseMove"))
   .Merge(Observable.FromEventPattern<MouseEventArgs>(yellowBorder, "MouseMove"))
   .Throttle(TimeSpan.FromSeconds(1))
   .DistinctUntilChanged(e => e.Sender) //new 
   .ObserveOnDispatcher()
   .Subscribe(ShowColor2);

On the next post I’ll have a more complex demo using collections.

 

Code for this demo here

Getting Started with Reactive Extensions (part 1)

This is the forth and final post in the series about RX

The following presentation is just a kick start overview (I did @ IL alt.net session), following right after with detailed code on my next post.

What is RX

In computing, reactive programming is a programming paradigm oriented around data flows and the propagation of change. (wikipedia)

RX is…

  1. A set of types representing asynchronous data streams
  2. A set of operators to query asynchronous data streams
  3. A set of types to parameterize concurrency

RX = Observables + LINQ + Schedulers

Supported Platforms

  • .Net 3.5 / 4 (WPF)
  • Silverlight 4 / 5
  • WP7
  • XNA (XBOX)
  • JavaScript (RxJs)

image

image

image

image

image

image

image

image

image

RX Installation

 

Rx Design Guide Lines

here (well composed PDF)

Code right on

next post