5

I'm a newbie in Rxjava. I have the following code:

    System.out.println("1: " + Thread.currentThread().getId());
    Observable.create(new rx.Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subcriber) {
            System.out.println("2: " + Thread.currentThread().getId());
            // query database
            String result = ....
            subcriber.onNext(result);
        }

    }).subscribeOn(Schedulers.newThread()).subscribe(countResult -> {
        System.out.println("3: " + Thread.currentThread().getId());
    });

For example, the output will be:

1: 50
2: 100
3: 100

I want subscribers run on the thread that has id 50. How can I do that?

Nayantara Jeyaraj
  • 2,624
  • 7
  • 34
  • 63
Meo Beo
  • 474
  • 2
  • 7
  • 17

5 Answers5

1

I think that there are two cases. Either you need it to run on the UI thread, or because of synchronisation. As I know you can not call a function on a specific thread, because when the method is called it is bound to the context of the thread, so it is impossible to call a method from a thread to another thread. Your problem is that the method in subscriber is called from Schedulers.newThread(). I also found this github issue about Schedulers.currentThread(). What you need is to notify the caller thread when the observer gets called.

Also you can use akka, it is way simpler to write concurrent code with it.

Sorry for my bad grammar.

Community
  • 1
  • 1
alex4o
  • 33
  • 1
  • 5
  • Strange, they mention AndroidSchedulers.handlerThread, but there's no such method in my rxjava.. – Fen1kz Jul 26 '16 at 11:36
0

From the docs:

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

So you can just use subscribe instead of subscribeOn to observe your collection on the same thread it was created, something like this:

Observable.create(new rx.Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subcriber) {
            System.out.println("2: " + Thread.currentThread().getId());
            // query database
            String result = ....
            subcriber.onNext(result);
        }

    }).subscribe(countResult -> {
        System.out.println("3: " + Thread.currentThread().getId());
    });

UPDATE: If your application is an Android application, you can use subscribe on a background thread as you do and pass the results to the main thread using Handler messages.

If your application is a Java application I may suggest using wait() and notify() mechanism or consider using frameworks such as EventBus or akka for more complex scenarios.

Mina Wissa
  • 10,923
  • 13
  • 90
  • 158
-1

With RxJava 1.0.15, you can apply toBlocking() before subscribe and everything will run on the thread that created the entire sequence of operators.

akarnokd
  • 69,132
  • 14
  • 157
  • 192
  • I don't think the OP wants to make his/her code synchronous. Instead he wants to switch to a certain thread before calling each subscriber. – FriendlyMikhail Nov 06 '15 at 14:45
  • The op mentions thread 50 which is the thread starting the async computation. There is no other way than blocking subscribe to get back to the same thread. – akarnokd Nov 06 '15 at 18:40
  • I was thinking about supplying an `ExecutorService` to `Schedulers.from(executorService)`, something like this: ```ExecutorService executorService = Executors.newSingleThreadExecutor(r -> Thread.currentThread());```. Unfortunately, crashes with `IllegalThreadStateException`. – AndroidEx Nov 08 '15 at 00:50
-1

So subscribeOn denotes what thread the Observable will start emitting items on, observeOn "switches" to a thread for the rest of the observable chain. Put a observeOn(schedulers.CurrentThread()) right before the subscribe and you'll be in the thread that this observable is created in rather than the thread it is executed in. Here's a resource that explains rxjava threading really well. http://www.grahamlea.com/2014/07/rxjava-threading-examples/

FriendlyMikhail
  • 2,857
  • 23
  • 39
  • 1
    There is no CurrentThread scheduler in RxJava and therw is no way of putting back execution to the same thread in different parts of the pipeline with multiple ObserveOn unless the scheduler is single threaded. – akarnokd Nov 06 '15 at 18:36
  • try Schedulers.immediate(), which "Creates and returns a {@link Scheduler} that executes work immediately on the current thread." – FriendlyMikhail Nov 06 '15 at 19:29
  • That still doesn't stick to the creation thread and is effectively a no op. – akarnokd Nov 07 '15 at 06:05
-1

I believe @akarnokd is right. You either run without the .subscribeOn(Schedulers.newThread()) so that it happens synchronously or use toBlocking() just before the subscribe. Alternatively if you just want everything to happen on the same thread but it doesn't have to be the current thread then you can do this:

Observable
    .defer(() -> Observable.create(...))
    .subscribeOn(Schedulers.newThread())
    .subscribe(subscriber);

defer ensures that the call to create happens on the subscription thread.

Dave Moten
  • 11,957
  • 2
  • 40
  • 47