4

I have a list of tasks which should be handled one-by-one in a new thread and then the result should be displayed in a method by some main thread. However this doesn't seem to work, the flatMap method is invoked in the main thread.

Why does not the subscribeOn method handle the "thread switch" in this case?

What would be a better pattern to execute some work in another thread? (except from using Observable.create and creating a new thread manually, which is very verbose)

List<Task> tasks = ...;
Observable.from(tasks)
          .flatMap(task -> {
                // should be handled in a new thread
                try {
                    return Observable.just(task.call());
                } catch (Exception e) {
                    log.error("Error", e);
                }

                return Observable.empty();
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(MySchedulers.main())
            .subscribe(this::show); // subscribe called from main thread
lukstei
  • 844
  • 2
  • 8
  • 20

1 Answers1

3

Caveat: I'm not a Java programmer, but C#, so all the weird camel case method names freak me out and confuse me.

The subscribeOn is in the wrong place if you want a new thread for the flatMap operation. Insert it between from and flatMap. See this answer for a full explanation of subscribeOn and observeOn - it's written for .NET but the principles are the same.

I'm not familiar with tasks in Java, so I'm not sure if your Task is like .NET's Task and whether task.call() is asynchronous and launches its own thread - I guess not from your question since you said "...list of tasks which should be handled one-by-one in a new thread".

A newThread scheduler uses a new thread per subscriber - since flatMap will make a single subscription, all task.call invocations will be made on the same thread, distinct though that will be from the from operator's thread.

If task.call actually is asynchronous then the results will come back according to however it introduces concurrency and that will be independent of Rx's semantics.

Either way, the (correctly placed) observeOn will cause the results to be passed to this::show on the main thread.

Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • It looks like `Task` here is some custom class that lukstei has created - we don't have a `Task` class with the same name & purpose as a .NET async Task. However, moving `subscribeOn` is right! – Adam S Feb 02 '15 at 14:15
  • @AdamS yes, you are right, `Task` should represent a single piece of work like a `Runnable` or a `Callable` – lukstei Feb 03 '15 at 15:47
  • @james-world thanks for the answer, regarding to the error handling: One single task could fail and should silently be ignored, but I see no advantage of using onError + onErrorResumeNext in contrast to my solution. – lukstei Feb 03 '15 at 15:53
  • Now that I understand more about what's going on with your `task`, I tend to agree, I'm editing that part out to avoid complicating the answer. – James World Feb 03 '15 at 16:11