1

I have to write into a file based on the incoming requests. As multiple requests may come simultaneously, I don't want multiple threads trying to overwrite the file content together, which may lead into losing some data.

Hence, I tried collecting all the requests' data using a instance variable of PublishSubject. I subscribed publishSubject during init and this subscription will remain throughout the life-cycle of application. Also I'm observing the same instance on a separate thread (provided by Vertx event loop) which invokes the method responsible for writing the file.

private PublishSubject<FileData> publishSubject = PublishSubject.create();

private void init() {
    publishSubject.observeOn(RxHelper.blockingScheduler(vertx)).subscribe(fileData -> writeData(fileData));
}

Later during request handling, I call onNext as below:

handleRequest() {
   //do some task
   publishSubject.onNext(fileData);
}

I understand that, when I call onNext, the data will be queued up, to be written into the file by the specific thread which was assigned by observeOn operator. However, what I'm trying to understand is

  1. whether this thread gets blocked in WAITING state for only this task? Or,
  2. will it be used for other activities also when no file writing happens? I don't want to end up with one thread from the vertx event loop wasted in waiting state for going with this approach. Also, please suggest any better approach, if available.

Thanks in advance.

ramtech
  • 757
  • 6
  • 15

1 Answers1

1

Actually RxJava will do it for you, by definition onNext() emissions will act in serial fashion:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications. (Observable Contract)

So as long as you will run blocking calls inside the onNext() at the subscriber (and will not fork work to a different thread manually) you will be fine, and no parallel writes will be happen.

Actually, you're worries should come from the opposite direction - Backpressure.
You should choose your backpressure strategy here, as if the requests will come faster then you will process them (writing to file) you might overflow the buffer and get into troubles. (consider using Flowable and choose you're backpressure strategy according to your needs.

Regarding your questions, that depends on the Scheduler, you're using RxHelper.blockingScheduler(vertx) which seems like your custom code, so I can't tell, if the scheduler is using shared thread in work queue fashion then it will not stay idle.
Anyhow, Rx will not determine this for you, the scheduler responsibility is to assign the work to some thread according to its logic.

yosriz
  • 10,147
  • 2
  • 24
  • 38
  • Thanks for the inputs. `RxHelper` is provided by Reactive Extensions library for Vert.x. Using `RxHelper.scheduler(vertx)` I'm getting one thread from existing vertx event loop pool, I don't handle it manually. Regarding backpressure, I'm not sure how to identify the back-pressure limit (load it till the breaking point?). Also, I should be looking into Flowable, I'm not aware of it. – ramtech Apr 25 '17 at 13:09
  • I went through `Flowable` and other similar classes such as `Completable`, `Single` etc. Thanks for directing me towards that. Some related questions that were useful for me: [(1)](http://stackoverflow.com/questions/40323307/observable-vs-flowable-rxjava2), [(2)](http://stackoverflow.com/questions/42525623/rxjava-2-x-should-i-use-flowable-or-single-completable). However, I can't use them right now as I'm using Rx-vertx library, which is [yet to upgrade to Rx2](https://github.com/vert-x3/issues/issues/189). – ramtech Apr 27 '17 at 09:56
  • Accepting the answer, as it conveyed that Rx doesn't block the thread assigned to consumer by `observeOn`, instead scheduler takes care of assigning tasks it. – ramtech Apr 27 '17 at 09:59
  • regarding the Scheduler, as you were saying you're assigning a loop-based thread for this purpose, so it will basically wait when no task is in queue (unless its a shared thread using other tasks in vertx) , but I don't see why it should bother you (unless you are short on threads count). regarding the backpressure, it's not that RxJava1 don't have backpressure, it have, it just that in RxJava2 it was separated as a desginated feature to Flowable, I assume your'e using RxJava2 from the tag. – yosriz Apr 27 '17 at 10:05
  • so you can definitely use backpressure operators, like `onBackpressureBuffer()`, `onBackPressureDrop()`, rx uses default buffer of 128,so using backpressure, and the type decision depends on your scenario. – yosriz Apr 27 '17 at 10:11
  • Thanks for more details on that. Regarding back pressure, in my scenario requests will come from GUI exposed to very few users. Based on the additional references (which I shared in previous comments), I decided not to explicitly handle back pressure for this scenario. – ramtech Apr 28 '17 at 14:15