5

I have an API available to clients that can be simplified to this:

public class API {
  public void sendEvent(Event e);
}

Event instances enter my system whenever a client calls the API (technically over Binder into a Service derivative) which are then processed, filtered and dispatched to other internal components. I don't care about past events, just those available from the time a subscriber subscribes. It seems like a natural fit for the Rx paradigm which I'm just getting my feet wet with.

I need an Observable that is created once, allows multiple subscribers, and can be fed instances of Event that are then sent through the reactive pipeline to observers. A Subject seems appropriate for what I'm looking to do (in particular, this answer to this question resonated with me).

What do other RxJava users recommend?

Community
  • 1
  • 1
scorpiodawg
  • 5,612
  • 3
  • 42
  • 62
  • depends. do you want to cache the events, or only serve those available from the moment the subscriber has subscribed? – njzk2 Jul 14 '15 at 19:11
  • The latter. I don't care about past events, just events available from the time a subscriber subscribes. – scorpiodawg Jul 14 '15 at 19:12
  • typically, you'll have to return an Observable created from something that implements `OnSubscribe` (probably the same class as you API, but possibly a delegate of that class). In the `call(Subscriber)` method, you'll register a new subscriber (add it to a list). Then, whenever `sendEvent` is called, call `onNext` on all your subscriber from your list. Bonus: remove subscribers if `isUnsubscribed` return true. – njzk2 Jul 14 '15 at 19:15
  • That was where I was heading, and it seemed like I'd be reimplementing logic that I thought might already exist in RxJava. Doesn't a subject do these things already? Then I'd just call `onNext` on it and it would pass things along to the subscribers... – scorpiodawg Jul 14 '15 at 19:19
  • possibly `publish` can handle the spreading for you, but the initial bridge between your data and the publish, you still have to do it – njzk2 Jul 14 '15 at 19:34

2 Answers2

11

For example, following on my short comment:

public class API implements OnSubscribe<Event> {
    private List<Subscriber<Event>> subscribers = new ArrayList<>();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        for (Subscriber<Event> sub : subscribers) {
            sub.onNext(event);
        }
    }
    public void call(Subscriber<Event> sub) {
        subscribers.add(sub);
    }
}

Then you probably have an instance somewhere: API api = ...

Your Observable is obtained like so: Observable.create(api); You can then do any normal thing you would do with an Observable.

The filtering of the unsubscribed Subscribers is left as an exercise to the reader.

Edit

A little more research shows that PublishSubject should help:

public class API {
    private PublishSubject<Event> subject = PublishSubject.create();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        // Then publish it
        subject.onNext(event);
    }
    public Observable<Event> getObservable() {
        return subject.asObservable();
    }
}

This way, you can subscribe to this Observable, and every time an event is sent to API, it is published to all subscribers.

Use like this:

API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));
scorpiodawg
  • 5,612
  • 3
  • 42
  • 62
njzk2
  • 38,969
  • 7
  • 69
  • 107
1

Try observable.share() which under the covers calls .publish().refCount(). It will use only one underlying subscription and give you the multiple subscription behaviour you specified.

Dave Moten
  • 11,957
  • 2
  • 40
  • 47