1

I have an IObservable of items with a timestamp. I use the Scan method to wrap each item and add a reference to the last valid wrapped item which was received.

IObservable<IWrappedItem> wrappedItems = 
  allItems.Scan(seedItem, 
    (lastWrappedItem, currentItem) => 
      WrapItem(currentItem, lastWrappedItem)));

This is the signature of WrapItem:

IWrappedItem WrapItem(Item currentItem, IWrappedItem lastItem);

We needed to change the WrapItem method so it skips invalid (not-null) items and returns null. The seedItem will most probably be null, and the WrapItem method can handle it.

I need to update the way I use Scan with something like this:

IObservable<IWrappedItem> wrappedItems = allItems.Scan(seedItem, (lastWrappedItem, currentItem) => 
{
  IWrappedItem wrappedItem = WrapItem(currentItem, lastWrappedItem);
  if (wrappedItem == null)
  {
    // Do something clever to skip this invalid item
    // Next item should have a reference to lastWrappedItem
  }
  return wrappedItem;
}));

How can I implement this behavior without returning null values to the new collection, while keeping the Observable pattern? Is there a different method that I should use instead of "Scan"?

Shahar
  • 655
  • 2
  • 7
  • 23

2 Answers2

1

You should just be able to simply use the Where method https://learn.microsoft.com/en-us/dotnet/api/system.linq.enumerable.where?view=net-7.0

IObservable<IWrappedItem> wrappedItems = allItems.Where(item => item != null).Scan(seedItem, (lastWrappedItem, currentItem) => 
{
  IWrappedItem wrappedItem = WrapItem(currentItem, lastWrappedItem);
  if (wrappedItem == null)
  {
    // Do something clever to skip this invalid item
    // Next item should have a reference to lastWrappedItem
  }
  return wrappedItem;
}));
JDChris100
  • 146
  • 9
  • This wouldn't work, the received items are not null. The problem is that the result of the WrapItem method could be null. – Shahar Feb 20 '23 at 15:25
0

EDIT: @TheodorZoulias - Thanks for your important comment. I tested it and saw that two parallel observers will not run concurrently, but the second one receives the last lastWrappedItem object of the first one as its seed instead of null. So I wrapped my observable with Defer as @Enigmativity suggested and it works correctly.

I found the answer to my question, I implemented a custom generic Scan method that receives the WrapItem function and ignores null values returned from it. It implements Scan using Select and Where methods.

This is my updated implementation:

public static IObservable<TAccumulate> ScanObservableAndFilterNulls<TSource, TAccumulate>(this IObservable<TSource> items, TAccumulate seed, Func<TSource, TAccumulate, TAccumulate> wrapItemFunc)
{
  // needed in order to protect from cases of concurrent observers
  // every observable will receive a new instance of the member `previousDataItem`
  return Observable.Defer(() =>
  {
    // use the seed before beginning the scan implementation
    TAccumulate lastWrappedItem = seed;
    // implement the custom Scan method
    return items.Select(item => wrapItemFunc(item, lastWrappedItem))
      .Where(wrappedItem =>
      {
        if (wrappedItem != null)
        {
          // update the lastWrappedItem only when the wrapped item is valid
          lastWrappedItem = wrappedItem;
          return true;
        }
        // skip invalid wrapped items, but keep the reference to the last valid item
        return false;
      });
  });
}

This method can be used like this:

public static IObservable<IWrappedItem> ScanAndWrapItems(IObservable<Item> allItems, IWrappedItem seedItem)
{
  return allItems.ScanObservableAndFilterNulls(seedItem, WrapItem);
}

I didn't benchmark the new method to assess the performance penalty, but I believe it would be slower than the original Scan method...

Shahar
  • 655
  • 2
  • 7
  • 23
  • 1
    The `TAccumulate lastWrappedItem` could be shared by different subscribers. Please take a look at [this](https://stackoverflow.com/questions/64843343/how-to-bind-a-non-disposable-object-with-each-subscription-of-a-cold-observable "How to bind a non-disposable object with each subscription of a cold observable?") question. – Theodor Zoulias Feb 20 '23 at 20:46
  • 2
    @TheodorZoulias is right. Always wrap state in an `Observable.Defer(...)`. – Enigmativity Feb 22 '23 at 12:04