1

When it comes to enforcing that a certain piece of Observable.create code runs in a specific thread (i.e. background thread), i worry that using the subscribeOn operator might not work because there are times that I might chain this observable sequence to another observable sequence that runs on a main thread (using observeOn).

Example

The situation is that I have an Observable sequence running on the main thread (i.e. an alert box asking the user for input, as to whether perform the network call or not).

Would it be better to ensure that this Observable.create code runs in the background thread by having something like:

Observable<String>.empty()
   .observeOn(ConcurrentMainScheduler(queue: background.queue))
   .concat(myObservableNetworkCall)

Why not just use subscribeOn?

The problem is if I had used subscribeOn (second) and the previous observable (the alert controller) was set to run on the background thread using subscribeOn (first), then the second subscribeOn operator would not work since the first call is closer to the source observable:

If you specify multiple subscribeOn() operators, the one closes to the source (the left-most), will be the one used.

That may be the behavior for RxJava, but I am not sure for Swift. Reactivex.io simply says that we should not call subscribeOn multiple times.

I tend to wrap operations into Observable<Void>s and they need to be run on different threads... That is why I am asking for how to ensure an Observable code run in the thread I specified it to. subscribeOn wouldn't work because I can concatenate the observable.

I want the thread it should run in to be encapsulated in my Observable definition, not higher up in the chain.

Is the best practice to do the following:

  1. Start with an Observable.empty using the data type I wish to use.
  2. Use observeOn to force the thread that I want it to run in.
  3. Concatenate it with the actual Observable that I want to use.

Edit

  1. I have read the subscribeOn and observeOn documentation on reactivex.io.

  2. I'm familiar with how to switch between threads using subscribeOn and observeOn.

  3. What I'm specifically concerned about is the complication of using subscribeOn when concatenating or combining observable sequences.

  4. The problem is, the observables need to run specifically on one thread, AND they don't know where and who they'll be concatenated with. Since I know exactly which thread they should run on, I'd prefer to encapsulate the scheduler definition within the observable's definition instead of when I'm chaining a sequence.

dsapalo
  • 1,819
  • 2
  • 19
  • 37
  • Have you had a look on Drivers? they are executed on the same thread. https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Units.md – Ocunidee Feb 07 '17 at 09:48
  • You would use this way (imagine you have a Variable error in your viewModel); viewModel .error .filter { $0 != nil } .asDriver() .drive(onNext: { [unowned self] error in self.handleError(error!) }) .addDisposableTo(disposeBag) – Ocunidee Feb 07 '17 at 09:55
  • Yes, I've read about Drivers already, but that solves a different problem. If it makes it clearer, imagine two separate background threads (not main thread) for my initial example. My confusion is about the nature of the `subscribeOn` operator and what a possible recommended best practice to ensure that a `Observable.create()` that was concatenated runs in a specific thread. – dsapalo Feb 07 '17 at 12:12

1 Answers1

0

In the function declaration it is better not to specify on which thread the function is to be called. For instance:

func myObservableNetworkCall() -> Observable<String> {
    return Observable<String>.create { observer in
        // your network code here

        return Disposables.create {
            // Your dispose
        }
    }
}

func otherObservableNetworkCall(s: String) -> Observable<String> {
    return Observable<String>.create { observer in
        // your network code here

        return Disposables.create {
            // Your dispose
        }
    }
}

And then switch between Schedulers:

myObservableNetworkCall()
    .observeOn(ConcurrentMainScheduler(queue: background.queue)) // .background thread, network request, mapping, etc...
    .flatMap { string in
        otherObservableNetworkCall(s: string)
    }
    .observeOn(MainScheduler.instance) // switch to MainScheduler, UI updates
    .subscribe(onNext:{ string in
        // do something 
    })
xandrefreire
  • 2,276
  • 14
  • 18
  • Thank you, but that is not my problem. I am familiar with how to use `subscribeOn` and `observeOn`. If you could please re-read the question to understand the complication about `subscribeOn` when concatenating sequences. – dsapalo Feb 07 '17 at 12:13
  • What i meant was, as shown in my question, this is how I do it right now. An empty observable following the data type that I use, and concatenating the observable that I use. Is this is your recommendation as the best practice for ensuring something runs in the background thread? I'm not sure what the best practice is. – dsapalo Feb 07 '17 at 12:21
  • from [this question](http://stackoverflow.com/questions/37973445/does-the-order-of-subscribeon-and-observeon-matter) `subscribeOn()` tells the whole chain which thread to start processing on. You can only call it once per chain. If you call it again lower down the stream it will have no effect. `observeOn()` causes all operations which happen below it to be executed on the specified scheduler. You can call it multiple times per stream to move between different threads. – xandrefreire Feb 07 '17 at 12:23
  • And a [good read](https://www.thedroidsonroids.com/blog/ios/rxswift-examples-4-multithreading/) explaining RxSwift Multithreading – xandrefreire Feb 07 '17 at 12:25
  • It is clear to me how to use `subscribeOn` and `observeOn`. I need clarification with the concatenation of two observables that have called `subscribeOn`, on ensuring that both of them run on the thread I have specified. Is the best practice to start with an empty observable, specify the scheduler, then concatenate it? – dsapalo Feb 07 '17 at 12:37
  • So, you have two observables that have called `subscribeOn`. Next, probably you combine this observables and `subscribeOn` does not work. Is it correct? – xandrefreire Feb 07 '17 at 13:48
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/135079/discussion-between-xfreire-and-iwillnot). – xandrefreire Feb 07 '17 at 13:56