2

I Have a stream of points and would like to combine each 2 points in order to draw a line .

public class MyPoint
{
    public int X { get; set; }
    public int Y { get; set; }
}

I am looking for something that would combine the functionality of Aggregate and Select , Meaning that i would like to later Subscribe to ether a complex type combining the 2 points , or to receive an aggregation as a parameter to my Observer's OnNext delegate :

Something like :

    pointObservable.Subscribe((prev, curr) => { }); 

or

    pointObservable.Subscribe((myLineStruct) => { }); 

Sample to build on :

  List<MyPoint> points = new List<MyPoint>();

  for (int i = 0; i < 10; i++)
  {
       points.Add(new MyPoint{ X = i , Y = i * 10});
  }

  IObservable<MyPoint> pointObservable = points.ToObservable();

After trying 2 solutions , i came across some issues :

First of here is my actual Stream :

observable  // Stream of 250 points arriving every interval 
     .Take(_xmax + 10)   // for test purposes take only the Graph + 10 
     .Select(NormalizeSampleByX) // Nomalize X  ( override Real X with display X (
     .Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX)  // which returns current only if current.X > prev.X 
     .DistinctUntilChanged() // remove all redundant previous points elements 

     // here i end up with a stream of normalized points 

     .Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr})
             // Dmitry Ledentsov 's addition  
            .Subscribe(res =>
            {
               Debug.WriteLine(" {0} {1}  , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);   
            });

with Dmitry's addition i get the following result .

0 862  , 252 -21 
1 888  , 253 -24 
2 908  , 254 -28 
3 931  , 255 -31 
4 941  , 256 -35 
5 890  , 257 -38 
6 802  , 258 -41 
7 676  , 259 -44 
8 491  , 260 -48 
9 289  , 261 -51 
10 231  , 262 -55 

@Enigmativity's suggestion :

 observable.Take(_xmax + 10)
            .Select(NormalizeSample)
            .Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX)
            .DistinctUntilChanged()
            .Publish(obs => obs.Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr}))
            .Subscribe(res =>
            {
               Debug.WriteLine(" {0} {1}  , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);   
            });

results in :

 59 862  , 1 -21 
 60 867  , 2 -24 
 61 893  , 3 -28 
 62 912  , 4 -31 
 63 937  , 5 -35 
 64 937  , 6 -38 
 65 870  , 7 -41 
 66 777  , 8 -44 
 67 632  , 9 -48 
 68 444  , 10 -51 
 69 289  , 11 -55  
 ...
 ...
eran otzap
  • 12,293
  • 20
  • 84
  • 139

3 Answers3

3

Observable.Scan is the easiest way to fold or compare current and previous items. I blogged on this here with some nice diagrams. Here is the code from that article, with an example specifically with points. The extension method is very flexible though, and it works with any source and result types:

public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
    this IObservable<TSource> source,
    Func<TSource, TSource, TResult> resultSelector)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (previous, current) => Tuple.Create(previous.Item2, current))
        .Select(t => resultSelector(t.Item1, t.Item2));
}

So if you have a complex type Delta like this:

public class Delta
{
    public Point P1 { get;set; }
    public Point P2 { get;set; }

    public static Delta Create(Point P1, Point P2)
    {
        return new Delta {
            P1 = P1,
            P2 = P2
        };
    }

    public override string ToString()
    {
        return string.Format("Delta is (" + (P2.X - P1.X)
            + "," + (P2.Y - P1.Y) + ")");
    }
}

You can use as follows:

Subject<Point> ps = new Subject<Point>();

ps.CombineWithPrevious(Delta.Create)
  .Subscribe(d => Console.WriteLine(d));

ps.OnNext(new Point(1,1));
ps.OnNext(new Point(2,2));
ps.OnNext(new Point(2,3));

Your output will be:

Delta is (0,0)
Delta is (1,1)
Delta is (1,1)
Delta is (2,3)

Note that default(TSource) is used to set an initial default - you can easily modify this to specify an initial default value, or handle that in the result selector, or skip the first element etc (.Skip(1)) - there are lots of options.

James World
  • 29,019
  • 9
  • 86
  • 120
  • You've answered the same question earlier. You could have voted this question to close(which I did). But +1 for the nice diagrams :) – Sriram Sakthivel Mar 05 '15 at 09:19
  • @SriramSakthivel I thought a while about it, but decided to modify to work with points and talk about the resultSelector and defaults. I thought the question here was a little more discoverable. It was a close call. – James World Mar 05 '15 at 09:20
  • Works great , thanks . i just added a Skip(1) after the scan to avoid the first null item, i want to start at ( 0 , Y ) - (1 , Y any ways , thanks . I'm sure the other guys gave great working answers as well but i just didn't wan't to complicate things with my complicated stream , when now i see it was probably unavoidable since the Zip answer gives me wierd an unexpected results . – eran otzap Mar 05 '15 at 11:02
2

The easiest way is probably zipping the sequence with a shifted original sequence:

var res = pointObservable.Zip(
    pointObservable.Skip(1),
    (p1, p2) => new { A = p1, B = p2 }
);

res.Subscribe(Console.WriteLine);

resulting in

{ A = (0,0), B = (1,10) }
{ A = (1,10), B = (2,20) }
{ A = (2,20), B = (3,30) }
{ A = (3,30), B = (4,40) }
{ A = (4,40), B = (5,50) }
{ A = (5,50), B = (6,60) }
{ A = (6,60), B = (7,70) }
{ A = (7,70), B = (8,80) }
{ A = (8,80), B = (9,90) }

given a ToString Method for MyPoint

Update:

as of the comments, in order to avoid unwanted side effects on subscriptions, the original sequence has to be Published before zipping. Thus, James World's answer using Scan is probably what you should use.

Using James' CombineWithPrevious:

var res = pointObservable
    .CombineWithPrevious((p1, p2) => new { A = p1, B = p2 })
    .Skip(1);

gives the same result

or a more succint version by Engimativity:

var res = pointObservable
    .Publish(po => 
        po.Zip(
            po.Skip(1),
            (p1, p2) => new { A = p1, B = p2 }
        )
    );
Community
  • 1
  • 1
Dmitry Ledentsov
  • 3,620
  • 18
  • 28
  • 2
    A pretty good solution, but it's a good idea to `.Publish` the `pointObservable` before doing the zip to avoid any possible side-effects (i.e. caused by double subscriptions). – Enigmativity Mar 05 '15 at 09:14
  • @Enigmativity thanks! Indeed, it might be a good idea. To make it work properly, one would have to know the context it is going to be used. If the sequence is not timed, like in the minimal console example, the subscription would miss all the values – Dmitry Ledentsov Mar 05 '15 at 09:26
  • Note that one of the key advantages of the `Scan` approach is that no publishing is required. – James World Mar 05 '15 at 09:27
  • 2
    @DmitryLedentsov - You don't need to do a full `.Publish` with `.Connect`. You can do it like this: `var res = pointObservable.Publish(po => po.Zip(po.Skip(1), (p1, p2) => new { A = p1, B = p2 }));`. All inline and it returns an `IObservable`. – Enigmativity Mar 05 '15 at 10:16
  • This actually didn't work like i excepted see the edit to my question – eran otzap Mar 05 '15 at 10:20
  • 1
    @Enigmativity that one is interesting too, and succint – Dmitry Ledentsov Mar 05 '15 at 10:22
  • 1
    @eranotzap - Of course it didn't work - you used two different observables, one had a bunch of operators after your `observable`, but the other one was just `observable.Skip(1)`. They need to be the same observable. Try doing it with my `.Publish` suggestion above. That should make it work. – Enigmativity Mar 05 '15 at 10:27
  • @Enigmativity i'm trying to understand what you suggested , i can't seem to get it to work . Could we speak here ? : http://chat.stackoverflow.com/rooms/7/c – eran otzap Mar 05 '15 at 10:38
  • @eranotzap - After `.DistinctUntilChanged()` put `.Publish(o => o.Zip(o.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr}))`. – Enigmativity Mar 05 '15 at 10:42
  • it seems like it's combining 2 streams from the results , two nearly 250 streams . i can't seem to understand the results.. – eran otzap Mar 05 '15 at 10:49
  • @eranotzap try forking this project to post a full minimal working example: https://github.com/d-led/csharp_travis_example – Dmitry Ledentsov Mar 05 '15 at 12:42
  • sure i'll do it later today – eran otzap Mar 05 '15 at 12:47
1

You want Scan:

points
    .Scan((LineSegment)null, (prev, point) => new LineSegment(prev == null ? point : prev.End, point))
    .Skip(1) // skip the first line segment which will not be valid
    .Subscribe(lineSegment => ... );
Brandon
  • 38,310
  • 8
  • 82
  • 87