91

I am chaining async operations using RxJava, and I'd like to pass some variable downstream:

Observable
   .from(modifications)
   .flatmap( (data1) -> { return op1(data1); })
   ...
   .flatmap( (data2) -> { 
       // How to access data1 here ?
       return op2(data2);
   })

It seems like a common pattern but I couldn't find information about it.

Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62
Julian Go
  • 4,442
  • 3
  • 23
  • 28

9 Answers9

70

The advice I got from the Couchbase forum is to use nested observables:

Observable
    .from(modifications)
    .flatmap( (data1) -> { 
        return op1(data1)
            ...
            .flatmap( (data2) -> { 
                // I can access data1 here
                return op2(data2);
            })
        });

EDIT: I'll mark this as the accepted answer as it seems to be the most recommended. If your processing is too complex to nest everything you can also check the solution with function calls.

Max
  • 915
  • 10
  • 28
Julian Go
  • 4,442
  • 3
  • 23
  • 28
20

Another possibility is to map the result of op1 to a org.apache.commons.lang3.tuple.Pair that contains the variable and pass that along:

Observable
   .from(modifications)
   .flatmap( (data1) -> {
       return op1(data1).map( obj -> { return Pair.of(data1,obj); });
   })
   ...
   .flatmap( (dataPair) -> { 
       // data1 is dataPair.getLeft()
       return op2(dataPair.getRight());
   })

It works but it feels a bit uncomfortable to have variables hidden inside a Pair/Triple/... and it gets very verbose if you use the Java 6 notation.

I wonder if there is a better solution, maybe some RxJava operator could help?

Julian Go
  • 4,442
  • 3
  • 23
  • 28
  • 3
    I don't think there's anything wrong with doing this, but whenever I feel I need to reach for the Pair class, I also feel I'm doing something wrong. The amount of times I've used it, and then re-factored it out later once I've got a better understanding on my domain. – Will Feb 06 '15 at 13:13
  • This is the solution that came to me, though it would create a ton of garbage making `Pair` instances just to keep `data1` next to `obj`. I'm wondering if a `combine` would work. – scorpiodawg Jul 15 '15 at 04:06
8

flatmap can take a second arg:

Observable.just("foo")
                .flatMap(foo -> Observable.range(1, 5), Pair::of)
                .subscribe(pair -> System.out.println("Result: " + pair.getFirst() + " Foo: " + pair.getSecond()));

source: https://medium.com/rxjava-tidbits/rxjava-tidbits-1-use-flatmap-and-retain-original-source-value-4ec6a2de52d4

Sisyphus
  • 896
  • 11
  • 19
6

One possibility would be to use a function call:

private static Observable<T> myFunc(final Object data1) {
    return op1(data1)
        ...
        .flatmap( (data2) -> { 
            // I can access data1 here
            return op2(data2);
        });
}

Observable
   .from(modifications)
   .flatmap( (data1) -> { return myFunc(data1); })

BUT: correct me if I'm wrong but it doesn't feel like the reactive-programming way of doing it

Julian Go
  • 4,442
  • 3
  • 23
  • 28
3

Actually we have library, that simplify call chains.

https://github.com/pakoito/Komprehensions

Adding as Gradle dependency:

implementation 'io.reactivex.rxjava2:rxjava:2.2.1'
implementation 'com.github.pakoito.Komprehensions:komprehensions-rx2:1.3.2'

Usage (Kotlin):

val observable = doFlatMap(
    { Observable.from(modifications) },
    { data1 -> op1(data1) },
    { data1, data2 -> op2(data2) },
    { data1, data2, data3 -> op3(data1, data2, data3) }
)
0x384c0
  • 2,256
  • 1
  • 17
  • 14
1

I know this is an old question, but using RxJava2 & lambda, You can use something like:

Observable
.from(modifications)
.flatMap((Function<Data1, ObservableSource<Data2>>) data1 -> {
                        //Get data 2 obeservable

                            return Observable.just(new Data2())
                        }
                    }, Pair::of)

On the next flow (flatmap/map) your output pair will be (data1, data2)

Ndivhuwo
  • 280
  • 2
  • 10
0

solution on this thread works, but for complex chains it makes code difficult to read, I had to pass multiple values and what i did was create a private class with all parameters, I find code to be more readable this way,

private class CommonData{
   private string data1;
   private string data2;

   *getters and setters*
}
...
final CommonData data = new CommonData();
Observable
   .from(modifications)
   .flatmap( (data1) -> { 
       data.setData1(data1);
       return op1(data1); 
   })
   ...
   .flatmap( (data2) -> { 
       data2 = data.getData1() + "data 2... ";
       data.setData2(data2);
       return op2(data2);
   })

hope it helps

josesuero
  • 3,260
  • 2
  • 13
  • 19
  • 2
    Unfortunately this is probably not thread-safe (depending on your code RxJava will run computation on multiple threads) – Julian Go Mar 27 '17 at 08:38
  • 1
    i'm confused, why would a private,non-static variable be thread-"unsafe" – josesuero Mar 28 '17 at 21:00
  • 1
    If RxJava creates multiple threads to flatmap() your data, the CommonData instance will be shared among the threads even if is not static. (This should be testable with some logging that displays the current thread and the CommonData values) – Julian Go Mar 29 '17 at 14:08
  • I need to find another solution because if you have to do something like: if (boolean) return observer1 else return observer2; you would have to flatmap on both which would be.. bad – josesuero Apr 02 '17 at 18:43
0

you can use resultSelector BiFunction<? super T, ? super U, ? extends R> resultSelector the second parameter in flatmap, you can choose which result to return.

-4

You can use "global" variable to achive this:

 Object[] data1Wrapper = new Object[]{null};
 Object[] data2Wrapper = new Object[]{null};
 Observable
    .from(modifications)
    .flatmap(data1 -> {
        data1Wrapper[0] = data1;
        return op1(data1)
     })
      ...
    .flatmap(data2 -> { 
        // I can access data1 here use data1Wrapper[0]
        Object data1 = data1Wrapper[0];
        data2Wrapper[0] = data2;
        return op2(data2);
     })
happyyangyuan
  • 179
  • 3
  • 14
  • 5
    This is bad advice, don't store data outside of the stream, use SwitchMap or similar approach to pass data. This is antipattern. – lsrom Feb 09 '20 at 11:26