2

I have a question regarding how to configure an c# Reactive ReplaySubject so that it exhibits the following behavior:

  1. When OnNext() is called, and there is no subscriber yet, the item handed to OnNext() should be cached inside the ReplaySubject.
  2. When there is at least one cached item, and a consumer subscribes to the ReplaySubject, it receives all cached items. After an Item is received, it should be removed from the ReplaySubject's cache and not returned anymore after a second consumer subscribes.
  3. When two or more consumers subscribed to the ReplaySubject before an item is added. All consumers should get the item that is passed to OnNext() when it is called. (This is not super important, it would also be ok if only the first or last subscriber gets the item)

Within the ReplaySubject's constructor, I can only find parameters that constraint the time an item is retained or the amount of items that are retained.

Here a little bit more context as to why I want to achieve the described behavior. I am working on a mobile phone app that communicates with a bluetooth device. The bluetooth device can send notifications which the app receives (BT Gatt Notification). After the app receives a bunch of notifications, it merges those notification into a message which is passed to the ReplaySubject and then processed by the subscribers.

Any hints regarding how to achieve the behavior described above or an alternative solution that is more fit to solve my use case are highly appreciated.

user2074945
  • 537
  • 2
  • 8
  • 22
  • Related: [How can I clear the buffer on a ReplaySubject?](https://stackoverflow.com/questions/28945061/how-can-i-clear-the-buffer-on-a-replaysubject) – Theodor Zoulias Dec 07 '20 at 16:32

1 Answers1

0

There's a couple fundamental problems with this:

  1. Rx is designed as a pub/sub system, with observables ignorant of their observers. The one exception to this is that observables don't 'broadcast' if nobody's listening. Leading to the second problem...
  2. Replay /ReplaySubject (see edit) require a subscriber to function. If there's zero subscribers (as you describe), then nothing with be cached.

The second problem is easier to overcome, just have a dummy subscriber to the ReplaySubject. This just leaves you with the first problem, of tracking subscriber counting, and then switching based on that.

I'm trying to mock-up something to track subscribers but running in to issues. Unless or until I get that working, hopefully this gives you some color on the depth of your problem.


EDIT: ReplaySubject doesn't require a dummy subscriber. .Replay() though does.

Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • Thanks for pointing out the issues. However, as of my understanding, values published to a ReplaySubject are actually cached if there is no observer. Please take a look at this link http://www.introtorx.com/content/v1.0.10621.0/02_KeyTypes.html (you would have to scroll down to the ReplaySubject Section). If I understand the example correctly, it does cache the first published value even tho there is no subscriber? – user2074945 Mar 26 '18 at 17:06
  • I'm sorry, you're correct. `ReplaySubject` caches without a subscriber. However, `.Replay()` or `.Multicast(new ReplaySubject)` does require a subscriber to cache. `.Replay` is more commonly used as it chains in functions better. – Shlomo Mar 26 '18 at 17:25