0

I'm wrapping a legacy code for a library that is not thread-safe. When the client of the library calls an API off the main thread I need to switch to the main thread and then switch back to the caller thread to return the results

I thought I could use (this is for Android but the question is more generic)

internal object TransformCompletableTemporarilySwitchToMainThread : CompletableTransformer {
    override fun apply(upstream: Completable): CompletableSource {
        return upstream
                .observeOn(Schedulers.trampoline())
                .subscribeOn(AndroidSchedulers.mainThread())
    }
}

Is there something like Schedulers.immediate() of RxJava1? I know that for testing you can replace Schedulers.immediate() with Schedulers.trampoline(), but from the documentation and from the tests I run it looks like Schedulers.trampoline() hasn't got much to do with Schedulers.immediate() Is there an alternative way to do it?

ADDED

   /**
 * Returns a default, shared {@link Scheduler} instance whose {@link io.reactivex.Scheduler.Worker}
 * instances queue work and execute them in a FIFO manner on one of the participating threads.
 * <p>
 * The default implementation's {@link Scheduler#scheduleDirect(Runnable)} methods execute the tasks on the current thread
 * without any queueing and the timed overloads use blocking sleep as well.
 * <p>
 * Note that this scheduler can't be reliably used to return the execution of
 * tasks to the "main" thread. Such behavior requires a blocking-queueing scheduler currently not provided
 * by RxJava itself but may be found in external libraries.
 * <p>
 * This scheduler can't be overridden via an {@link RxJavaPlugins} method.
 * @return a {@link Scheduler} that queues work on the current thread
 */

what do these two parts mean?

  • Returns a default, shared {@link Scheduler} instance whose {@link io.reactivex.Scheduler.Worker} * instances queue work and execute them in a FIFO manner on one of the participating threads.

and

  • @return a {@link Scheduler} that queues work on the current thread
1048576
  • 615
  • 9
  • 27

1 Answers1

2

The immediate or trampoline schedulers are not suitable for returning to a specific thread. You need a single-threaded scheduler for that which you can create from an ExecutorService:

ExecutorService exec = Executors.newSingleThreadedExecutor();
Scheduler singleThreaded = Schedulers.from(exec);

Observable.fromCallable(() -> api.init())
.subscribeOn(singleThreaded)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(v -> System.out.println("Initialized"))
.observeOn(singleThreaded)
.map(v -> api.getData(v))
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(v -> System.out.println("Some data: "))
.observeOn(singleThreaded)
.doOnNext(v -> api.close())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(v -> System.out.println("Done"))
;

// later
exec.shutdown();

Edit::

It is not possible to return to an arbitrary thread. However, if a thread has a looper/handler, then you can turn that into a scheduler with AndroidSchedulers and target it via observeOn.

akarnokd
  • 69,132
  • 14
  • 157
  • 192
  • My problem is different. I need to switch to a single thread (in my example I used the main thread but it can be another one), and then observerOn the caller thread that can be anything the caller decided to use – 1048576 Mar 13 '18 at 15:56
  • If that thread has a looper/handler, then you can turn that into a scheduler with `AndroidSchedulers`. – akarnokd Mar 13 '18 at 16:07
  • Right OK. If you add this as an answer I'll approve it. Thank you – 1048576 Mar 13 '18 at 16:39