I have a sequence of stock symbols & a sequence of stock price. Each and every time I get a stock symbol (it is guaranteed to be unique), I need to span a 100 millisecond window & process the price for the stock from stock price sequence. If stock price is missing from the sequence within this 100 millisecond I need to process the stock without price.
Marble diagram to better visualize the requirement:
Stock : -S1--S2--
Price : ---P1-P2-
Result: -S1---S2P2-
So the stock 1
comes in, 100 millisecond window is spanned, but there is no price for the symbol, hence the result should be just stock 1
(S1
).
Then stock 2
comes in, again 100 millisecond window is spanned, and there is no price for stock 2
yet. However, before 100 millisecond window closes, we get price for stock 2
(P2
), hence the result is stock 2
with it's price (S2P2
).
It is possible that prices will come in random order, so no assumption can be made on the order of price sequence.
I've seen this related SO question but can't make it work.
I'm trying to use GroupJoin
stockSubject
.GroupJoin(
stockPriceSubject,
stock => Observable.Timer(TimeSpan.FromMilliseconds(100)),
price => Observable.Never<Unit>(),
(stock, stockPrice) =>
{
var stockPrices = stockPrice.Where(sprice => sprice.Stock.Equals(stock))
.FirstOrDefaultAsync()
.DefaultIfEmpty();
return (Stock: stock, StockPrices: stockPrices);
})
.Subscribe(async tuple => WriteLine($"{tuple.Stock} | {(await tuple.StockPrices)?.Price ?? 'N'}"));
This doesn't work as it looses some prices (happens non deterministic so can't really figure out what's wrong).
Another approach I tried, while working, doesn't look optimal
stockSubject
.Subscribe(stock =>
{
stockPriceSubject
.Buffer(TimeSpan.FromMilliseconds(100))
.Take(1)
.Subscribe(bufferedPrices =>
{
var stockPrice = bufferedPrices.FirstOrDefault(price => price.Stock.Equals(stock));
if (stockPrice == null)
{
Console.WriteLine($"{stock} is w/o price");
return;
}
Console.WriteLine($"{stockPrice}");
});
});
One thing I really don't like with this, is that I'm leaving subscription behind every time there is a new stock when I'm subscribing to buffered prices.
Any clue what's the best way to implement this scenario using Rx ?
Relevant classes for stock & stock price
sealed class Stock : IEquatable<Stock>
{
public Stock(string symbol)
{
Symbol = symbol;
}
public string Symbol { get; }
public override string ToString() =>
$"Stock[{Symbol}]";
// IEquatable implementation is emitted for the sake of brevity
}
sealed class StockPrice
{
public StockPrice(Stock stock, decimal price)
{
Stock = stock;
Price = price;
}
public Stock Stock { get; }
public decimal Price { get; }
public override string ToString() =>
$"{Stock} is traded @ {Price}";
}
EDIT adding test data code generator as requested
Every 10 milliseconds new stock is pushed to stocks sequence (MSFT -> GOOG -> APPL).
Every 20 millisecond new price is pushed to prices sequence (APPL -> GOOG).
After 1 second price for stock MSFT is pushed to the prices sequence.
Expected output:
Once MSFT is pushed to the stocks sequence, 100 millisecond window is opened for prices... within 100 millisecond no price for MSFT is pushed to prices sequence, hence the MSFT stock should be processed without price (in the result set price is empty/null)
After GOOG is pushed to stocks sequence, again 100 millisecond window is opened, this time there is a price for GOOG stock within 100 millisecond, hence the GOOG stock should be processed with price (15m).
And finally APPL - expected output here is the same as with MSFT... since no price was pushed for APPL within 100 millisecond window since it was pushed to stocks sequence, it should be processed without price. Here the fact that there was APPL stock price published before shouldn't affect anything.
var stockSubject = new Subject<Stock>();
var stockPriceSubject = new Subject<StockPrice>();
Observable
.Interval(TimeSpan.FromMilliseconds(10))
.Take(3)
.Subscribe(_ =>
{
switch (_)
{
case 0:
{
var stock = new Stock("MSFT");
stockSubject.OnNext(stock);
break;
}
case 1:
{
var stock = new Stock("GOOG");
stockSubject.OnNext(stock);
break;
}
case 2:
{
var stock = new Stock("APPL");
stockSubject.OnNext(stock);
break;
}
}
});
Observable
.Interval(TimeSpan.FromMilliseconds(20))
.Take(3)
.Subscribe(_ =>
{
switch (_)
{
case 0:
{
var stockPrice = new StockPrice(new Stock("APPL"), 10m);
stockPriceSubject.OnNext(stockPrice);
break;
}
case 1:
{
var stockPrice = new StockPrice(new Stock("GOOG"), 15m);
stockPriceSubject.OnNext(stockPrice);
break;
}
}
});
Observable
.Timer(TimeSpan.FromSeconds(1))
.Subscribe(_ =>
{
var stockPrice = new StockPrice(new Stock("MSFT"), 20m);
stockPriceSubject.OnNext(stockPrice);
});