1

I have an Observable which is executing a network request and emits the data. I have multiple subscribers for this Observable and since I don't want to re-execute the network request for each new subcribers, I'm multicasting the Observable using the replay().autoConnect() operator. This works perfect and for my second subscriber is immediately getting the data without re executing the network request, since the replay operator caches the old result. But the problem is, I don't want this cached data to be modified. For example , some of the subscribers make some modifications to the result , Which I don't want to get reflected in first place. So, basically I don't want to duplicate the network request , at the same time I need a deep copy of the cached data as well. Is it possible using any operator? Or is there any Rx solution for this problem? I think my question is clear.

I am adding a sample code snippet which shows the issue.

public class Person {
    String name;

    public Person(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return name;
    }
}



private List<Person> getPersonList() {
      List<Person> persons = new ArrayList<>();
      persons.add(new Person("Person A"));
      persons.add(new Person("Person B"));
      persons.add(new Person("Person C"));
      return persons;
    }

Observable<List<Person>> netWorkCall = Observable.fromCallable(this::getPersonList)
            .replay()
            .autoConnect();

 // first subscription
    netWorkCall.subscribe(persons -> {
        for (Person person : persons) {
            System.out.println(person);
        }

        // output is as follows
        // Person A
        // Person B
        // Person C
    });

    // second subscription, whic does some modification
    netWorkCall
            .flatMap(new Function<List<Person>, ObservableSource<List<Person>>>() {
                @Override
                public ObservableSource<List<Person>> apply(@NonNull List<Person> persons) throws Exception {
                    return Observable.fromCallable(() -> persons);
                }
            })
            .subscribe(persons -> {
                for (int i = 0; i < persons.size(); i++) {
                    Person person = persons.get(i);
                    person.name = "Person " + i;
                    System.out.println(person);
                }
                // output is as follows
                // Person 0
                // Person 1
                // Person 2
            });

    // third subscription , again calls the replayed Observable
    netWorkCall.subscribe(persons -> {
        for (Person person : persons) {
            System.out.println(person);
        }

        // Here I need output is as follows
        // Person A
        // Person B
        // Person C
        // But what I get is
        // Person 0
        // Person 1
        // Person 2

    });

1 Answers1

0

Suppose if you have two Subscriber out of which one subscriber want's the data as it is and the second subscriber is modifing data .

Observable<String> obs =apiService.getData()
          .subscribeOn(ioThread())
          .observeOn(mainThread())
          .replay().autoConnect();

First observable to get data as it is

obs.subscribe((data)->{//Do what ever});

Second Observable to modify data in some way

obs.map(intenseOperation())
   .subscribe((data)->{})

This way both observable will get the data and you can perform operation specific to one subscriber.

In short keep all the common operation before autoconnect and perform subscriber specific task in different stream Watch the talk of This guy for understanding MultiCasting: youtube-link

update

If you want to reduce your api Request rather than using replay() operator instead add the custom cache Interceptor in your api refer this stackoverflow-answer

pratham kesarkar
  • 3,770
  • 3
  • 19
  • 29
  • What could be this `intenseOperation()` ? –  Jan 21 '18 at 15:52
  • it can be any function what ever you are performing I just gave an example – pratham kesarkar Jan 21 '18 at 15:54
  • That I understood. But If I add a `map` operator before the subscribe, again I have to return a new copy of the result manually right? The input to the map operator is the cached `replayed` data, I agree, but the output of the map operator, which is coming to the subscriber is what I return from `map` right? –  Jan 21 '18 at 16:01
  • So, still I will have to make sure that I return a deep copy, instead of the same reference (replayed data)? Am I correct? –  Jan 21 '18 at 16:02
  • It will only cache data up to common operation all the downstream operation won't be cached – pratham kesarkar Jan 21 '18 at 16:03
  • So in my example map operation is in different stream whenever observable replay when a new subscriber join it will only give data fetched from the network not manuplated by map – pratham kesarkar Jan 21 '18 at 16:05
  • I tried your approach but doesn't seem to work in my case. Because replay gives the same object reference always. say `ObjRefA` . the `map` operator takes the same `ObjRefA` as input and the second subscriber also gets the `ObjRefA` . Say in second subscriber I do some modification `ObjRefA.name = "New Name"` this is affected in the cached data of `replay` also. Which means any upcoming subscriber if it calls `replayed` Observer, would still receive `ObjRefA` but it has already been modified by the second subscriber. –  Jan 21 '18 at 16:46
  • You got the issue? –  Jan 21 '18 at 16:47
  • _**it will only give data fetched from the network not manuplated by map**_ it does not happen in my case. Manipulation does affect replay also –  Jan 21 '18 at 16:48
  • This video I already watched. I have updated the question with a sample that shows my issue. Can you please look into that –  Jan 21 '18 at 17:09
  • The sample given describes the issue? Here a Person object I could have cloned. But my actual network call result is kind of a big object. creating deep copy of it is difficult. Also please correct me if the sample is wrong –  Jan 21 '18 at 17:13