0

I have a sequence of stock symbols & a sequence of stock price. Each and every time I get a stock symbol (it is guaranteed to be unique), I need to span a 100 millisecond window & process the price for the stock from stock price sequence. If stock price is missing from the sequence within this 100 millisecond I need to process the stock without price.

Marble diagram to better visualize the requirement:

Stock : -S1--S2--

Price : ---P1-P2-

Result: -S1---S2P2-

So the stock 1 comes in, 100 millisecond window is spanned, but there is no price for the symbol, hence the result should be just stock 1 (S1).

Then stock 2 comes in, again 100 millisecond window is spanned, and there is no price for stock 2 yet. However, before 100 millisecond window closes, we get price for stock 2 (P2), hence the result is stock 2 with it's price (S2P2).

It is possible that prices will come in random order, so no assumption can be made on the order of price sequence.

I've seen this related SO question but can't make it work. I'm trying to use GroupJoin

stockSubject
.GroupJoin(
    stockPriceSubject,
    stock => Observable.Timer(TimeSpan.FromMilliseconds(100)),
    price => Observable.Never<Unit>(),
    (stock, stockPrice) =>
    {            
        var stockPrices = stockPrice.Where(sprice => sprice.Stock.Equals(stock))
                                    .FirstOrDefaultAsync()
                                    .DefaultIfEmpty();
        return (Stock: stock, StockPrices: stockPrices);
    })
.Subscribe(async tuple => WriteLine($"{tuple.Stock} | {(await tuple.StockPrices)?.Price ?? 'N'}"));

This doesn't work as it looses some prices (happens non deterministic so can't really figure out what's wrong).

Another approach I tried, while working, doesn't look optimal

stockSubject
    .Subscribe(stock =>
    {
        stockPriceSubject
            .Buffer(TimeSpan.FromMilliseconds(100))
            .Take(1)
            .Subscribe(bufferedPrices =>
            {
                var stockPrice = bufferedPrices.FirstOrDefault(price => price.Stock.Equals(stock));
                if (stockPrice == null)
                {
                    Console.WriteLine($"{stock} is w/o price");
                    return;
                }

                Console.WriteLine($"{stockPrice}");
            });
    });

One thing I really don't like with this, is that I'm leaving subscription behind every time there is a new stock when I'm subscribing to buffered prices.

Any clue what's the best way to implement this scenario using Rx ?

Relevant classes for stock & stock price

sealed class Stock : IEquatable<Stock>
{
    public Stock(string symbol)
    {
        Symbol = symbol;
    }

    public string Symbol { get; }

    public override string ToString() =>
        $"Stock[{Symbol}]";

    // IEquatable implementation is emitted for the sake of brevity
}

sealed class StockPrice
{
    public StockPrice(Stock stock, decimal price)
    {
        Stock = stock;
        Price = price;
    }

    public Stock Stock { get; }
    public decimal Price { get; }

    public override string ToString() =>
        $"{Stock} is traded @ {Price}";
}

EDIT adding test data code generator as requested

Every 10 milliseconds new stock is pushed to stocks sequence (MSFT -> GOOG -> APPL).

Every 20 millisecond new price is pushed to prices sequence (APPL -> GOOG).

After 1 second price for stock MSFT is pushed to the prices sequence.

Expected output:

Once MSFT is pushed to the stocks sequence, 100 millisecond window is opened for prices... within 100 millisecond no price for MSFT is pushed to prices sequence, hence the MSFT stock should be processed without price (in the result set price is empty/null)

After GOOG is pushed to stocks sequence, again 100 millisecond window is opened, this time there is a price for GOOG stock within 100 millisecond, hence the GOOG stock should be processed with price (15m).

And finally APPL - expected output here is the same as with MSFT... since no price was pushed for APPL within 100 millisecond window since it was pushed to stocks sequence, it should be processed without price. Here the fact that there was APPL stock price published before shouldn't affect anything.

var stockSubject = new Subject<Stock>();
var stockPriceSubject = new Subject<StockPrice>();

Observable
   .Interval(TimeSpan.FromMilliseconds(10))
   .Take(3)
   .Subscribe(_ =>
   {
       switch (_)
       {
           case 0:
               {
                   var stock = new Stock("MSFT");
                   stockSubject.OnNext(stock);
                   break;
               }
           case 1:
               {
                   var stock = new Stock("GOOG");
                   stockSubject.OnNext(stock);
                   break;
               }
           case 2:
               {
                   var stock = new Stock("APPL");
                   stockSubject.OnNext(stock);
                   break;
               }
       }
   });

Observable
    .Interval(TimeSpan.FromMilliseconds(20))
    .Take(3)
    .Subscribe(_ =>
    {
        switch (_)
        {
            case 0:
                {
                    var stockPrice = new StockPrice(new Stock("APPL"), 10m);
                    stockPriceSubject.OnNext(stockPrice);
                    break;
                }
            case 1:
                {
                    var stockPrice = new StockPrice(new Stock("GOOG"), 15m);
                    stockPriceSubject.OnNext(stockPrice);
                    break;
                }
        }
    });

Observable
    .Timer(TimeSpan.FromSeconds(1))
    .Subscribe(_ =>
    {
        var stockPrice = new StockPrice(new Stock("MSFT"), 20m);
        stockPriceSubject.OnNext(stockPrice);
    });
Michael
  • 2,961
  • 2
  • 28
  • 54

1 Answers1

1

It's impossible to test your answer without some test code. I'm also not sure what you want to do with the data downstream. If this answer doesn't suffice, please amend the question with that information.

The solution to what you're asking is I believe quite simple:

stocks
    .Select(s => (Stock: s, StockPrices: prices
        .TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
        .Where(p => p.Stock == s)
    ));

This will lead to multiple subscription problems against prices, so that can be fixed with this:

prices.Publish(_prices => 
    stocks
        .Select(s => (Stock: s, StockPrices: _prices
            .TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100)))
            .Where(p => p.Stock == s)
        ))
    );

Join and GroupJoin won't work as nicely in the case that you get 0 prices for a stock. I wouldn't recommend it for your scenario. However, if you go back to it, you should change the Observable.Never to Observable.Empty. Never leaves the price's window open forever, so an old price could be joined with a new stock.


EDIT:

Here's some testing code using Microsoft.Reactive.Testing:

TestScheduler ts = new TestScheduler();
var stockSource = ts.CreateHotObservable<Stock>(
    new Recorded<Notification<Stock>>(10.MsTicks(), Notification.CreateOnNext(new Stock("MSFT"))),
    new Recorded<Notification<Stock>>(20.MsTicks(), Notification.CreateOnNext(new Stock("GOOG"))),
    new Recorded<Notification<Stock>>(30.MsTicks(), Notification.CreateOnNext(new Stock("AAPL")))
);

var priceSource = ts.CreateHotObservable<StockPrice>(
    new Recorded<Notification<StockPrice>>(20.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("AAPL"), 10m))),
    new Recorded<Notification<StockPrice>>(40.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("GOOG"), 15m)))
);


var target = priceSource.Publish(_prices =>
    stockSource
        .Select(s => (Stock: s, StockPrices: _prices
            .TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(100), ts))
            .Where(p => p.Stock.Symbol == s.Symbol)
        ))
    );
var observer = ts.CreateObserver<(Stock, IObservable<StockPrice>)>();
target.Subscribe(observer);

var target2 = target.SelectMany(t => t.StockPrices.Select(sp => (Stock: t.Stock, Price: sp)));
var observer2 = ts.CreateObserver<(Stock, StockPrice)>();
target2.Subscribe(observer2);
ts.Start();

observer.Messages.Dump();   //LinqPad
observer2.Messages.Dump();  //LinqPad

and using extension method:

public static class Extensions
{
    public static long MsTicks(this int i)
    {
        return TimeSpan.FromMilliseconds(i).Ticks;
    }
}

For me, this works. Only problem was the IEquatable lack of implementation. So I switched from .Where(p => p.Stock == s) to .Where(p => p.Stock.Symbol == s.Symbol). Is that possibly your issue?

Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • Thanks for looking into this. Added requested test data generator code with explanation what should be the expected behavior. Tried your code, `StockPrices` is empty always. – Michael Oct 22 '18 at 18:11
  • Updated answer. Let me know. – Shlomo Oct 22 '18 at 23:47
  • Still doesn't work. In the `priceSource` add one more record: `new Recorded>(80.MsTicks(), Notification.CreateOnNext(new StockPrice(new Stock("APPL"), 25m)))`. Now result should also print `APPL` stock with price `25` since `APPL` stock was pushed @ `30` ms and spanning 100 millisecond window, price submitted @ `80` ms should have been picked up. – Michael Oct 23 '18 at 17:24
  • Change to "AAPL". :) – Shlomo Oct 23 '18 at 17:33
  • Ooops, sry for sloppiness... It works now, thanks for helping out! – Michael Oct 23 '18 at 18:14