2

On Windows Phone 7 there is a new version of the BufferWithTimeOrCount extension method for IObservable that returns a "stream of streams" instead of the previous "stream of lists". I'm having difficulty trying to use either the new or old methods, so maybe I just don't understand how it works, but my goal is to create a stream that only fires when an existing stream matches a specified time based pattern during the previous 2 touch events. So far I have created streams for TouchUp and TouchDown (see related question) and In pseudo code I want something like:

//BufferLast2 should contain the last 1 or 2 touch events that occurred in the last 500ms. If no touches occurred this should return an empty set
var BufferLast2 = TouchDown.Merge(TouchUp).BufferWithTimeOrCount(TimeSpan.FromSeconds(0.5), 2);
//Use BufferLast2 to detect tap (TouchDown then TouchUp occuring in less than 0.5s)
var TouchTap = from touch2 in BufferLast2
               where touch2.Count == 2 && touch2.First().Action == TouchAction.Down && touch2.Last().Action == TouchAction.Up
               select touch2.First(); //returns initial TouchDown event
//Use BufferLast2 to detect Hold (TouchDown with no TouchUp occuring in 0.5s)
var TouchHold = from touch2 in BufferLast2
                where touch2.Count == 1 && touch2.First().Action == TouchAction.Down
                select touch2.First(); //returns initial TouchDown event

When using the "Stable" Microsoft.Phone.Reactive version of Rx that is built into the ROM calling IObservable<Class>.BufferWithTimeOrCount(...) returns a IObservable<IList<Class>>, which is pretty easy to work with using the standard list operators (as outlined above), but for some reason BufferLast2 was always returning two down events instead of the Down->Up sequence that I expected.

I figured it might be a bug in the code, so I tried adding a reference to the latest version of Rx and used the Observable Extensions from C:\Program Files (x86)\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2838.0\WP7\System.Reactive.dll in which BufferWithTimeOrCount(...) returns a IObservable<IObservable<Class>>. This makes simple filters like Where x.Count == 2 or Where x.First().P == ... much harder to write. I haven't actually figured out how to do a simple filter like x.Count() == 2 on this return value without creating a completely separate subscription or Subject object, which seams way too complex. It's probably a simple error like my last question (all I needed was a Where clause :-P) but it is really driving me bonkers. Any help?

Community
  • 1
  • 1
Greg Bray
  • 14,929
  • 12
  • 80
  • 104
  • P.S. If you have over 2.5K points help clean up the Rx tags at http://stackoverflow.com/tags/system.reactive/synonyms – Greg Bray Jan 30 '11 at 07:28
  • Where is the new Windows Phone Rx API coming from? Or are you talking about the latest WP-compatible System.Observable release? – Richard Szalay Jan 30 '11 at 09:06
  • The latest version is in the System.Linq namespace from the System.Reactive.dll in Build 1.0.2838.0 12/24/2010 from http://blogs.msdn.com/b/rxteam/archive/2010/12/25/rx-christmas-release-2010-introducing-join-and-groupjoin-operators-and-more.aspx In that assembly BufferWithTimeOrCount has a different signature than the version baked in the Microsoft.Phone.Reactive namespace that is part of the Windows Phone OS Image. You can only use one or the other as per http://blogs.msdn.com/b/rxteam/archive/2010/10/28/rx-for-windows-phone-7.aspx – Greg Bray Jan 30 '11 at 09:19
  • Any help with how to implement a Where clause on an Observable> without creating a completely separate subscription would be helpful, as I'm lost :-P – Greg Bray Jan 30 '11 at 09:49
  • This helped a little, but I'm still not sure how to do filtering on streams of streams... Time for sleep now. http://channel9.msdn.com/Shows/Going+Deep/Programming-Streams-of-Coincidence-with-Join-and-GroupJoin-for-Rx – Greg Bray Jan 30 '11 at 10:36

2 Answers2

3

Changing the api makes the buffering look more Rx-y and fits with their Window operator implementation (wouldn't be surprised if using reflector you'd be able to see the Buffer operators using Window). I would think there's probably a variety of reasons that they've changed it. I'm not going to second guess them as they're a lot smarter than me!

So here's my stab at a solution. There may be a cleaner way to get what you're after but i'd probably implement my own extention method to buffer into a list. Maybe something like:

public static class BufferToList
{
   public static IObservable<IEnumerable<TSource>> BufferToList<TSource>(this IObservable<TSource> source)
    {
       return Observable.CreateWithDisposable<IEnumerable<TSource>>(observer => 
          {
             var list = new List<TSource>();

             return source.Subscribe(list.Add,
               observer.OnError,
               () =>
               {
                  observer.OnNext(list);
                  observer.OnCompleted();
               });
          });
    }
}

Then something like:

TouchDown.Merge(TouchUp)
   .BufferWithTimeOrCount(TimeSpan.FromSeconds(0.5), 2)
   .Select(bufferedValues => bufferedValues.BufferToList())
   .Subscribe(OnBufferOpen)

private void OnBufferOpen(IObservable<IEnumerable<IEvent<IEventArgs>>> bufferedListAsync)
{
   bufferedListAsync.Where(list => list.Count() == 2);
}

I suggest if you want a full explanation of why they've changed the api, go and ask the question over on the rx forums on msdn

James Hay
  • 12,580
  • 8
  • 44
  • 67
  • If you watch the "Going Deep" video posted in the comments above that is exactly what they are doing. My problem is there aren't a lot of examples of how to work with Windows. Your extension method approach makes sense... I'll give that a try. I think I may end up using a custom window approach, as using BufferWithTimeOrCount creates a lot of empty sets that don't need to be processed. Instead the window should start when a touchdown occurs and end when touchup occurs. The stream could then include all the move events inbetween and you could use a timeout to detect if it is a tap or hold event – Greg Bray Jan 30 '11 at 18:50
  • @Greg Bray - Yeah have watched the video. Think it's about the only documentation out there at the moment. – James Hay Jan 30 '11 at 19:27
  • @Greg Bray - Sorry Greg, realised last night (was away from machine) that switching won't work as the subscription will complete after the first buffer is received. I've updated my example so that it will work for every new buffer. – James Hay Jan 31 '11 at 08:31
  • I ended up going with a custom Window instead, so that it now creates windows on every TouchDown to TouchUp event. I used your extension method approach to then convert each window into a List of TouchDown -> TouchMove(s) -> TouchUp that can be analyzed on the completion of each window. I created another variant that returns a list from a partial window using a timeout. So far its working well. I'll post a link to the code after I clean it up a bit. Thanks for all your help! – Greg Bray Feb 01 '11 at 06:54
3

The latest release of Rx, v1.0.2856.0, provides both buffers and windows. For the buffers, we restored the original signatures, based on IList. The corresponding window operators will return nested observable sequences.

The way the Buffer* operators are implemented is by composing the corresponding Window* operator with the new ToList extension method that takes an IObservable into an IObservable>. All the Buffer* operator does is invoke this new ToList operator in a SelectMany selector.

  • Cool. Thanks Bart. Thought the old buffering operators may come back in :) – James Hay Feb 12 '11 at 10:46
  • Excellent! I like the option to use windows, but sometimes you really just want the list. Keep up the great work! – Greg Bray Feb 12 '11 at 21:52
  • and welcome to StackOverflow! You might want to check out the http://stackoverflow.com/questions/tagged/system.reactive tag :-J – Greg Bray Feb 12 '11 at 21:57