3

I have written a Spring Boot micro service using RxJava (aggregated service) to implement the following simplified usecase. The big picture is when an instructor uploads a course content document, set of questions should be generated and saved.

  • User uploads a document to the system.
  • The system calls a Document Service to convert the document into a text.
  • Then it calls another question generating service to generate set of questions given the above text content.
  • Finally these questions are posted into a basic CRUD micro service to save.

When a user uploads a document, lots of questions are created from it (may be hundreds or so). The problem here is I am posting questions one at a time sequentially for the CRUD service to save them. This slows down the operation drastically due to IO intensive network calls hence it takes around 20 seconds to complete the entire process. Here is the current code assuming all the questions are formulated.

questions.flatMapIterable(list -> list).flatMap(q -> createQuestion(q)).toList();

private Observable<QuestionDTO> createQuestion(QuestionDTO question) {
    return Observable.<QuestionDTO> create(sub -> {
        QuestionDTO questionCreated = restTemplate.postForEntity(QUESTIONSERVICE_API,
                new org.springframework.http.HttpEntity<QuestionDTO>(question), QuestionDTO.class).getBody();
        sub.onNext(questionCreated);
        sub.onCompleted();
    }).doOnNext(s -> log.debug("Question was created successfully."))
            .doOnError(e -> log.error("An ERROR occurred while creating a question: " + e.getMessage()));
}

Now my requirement is to post all the questions in parallel to the CRUD service and merge the results on completion. Also note that the CRUD service will accept only one question object at a time and that can not be changed. I know that I can use Observable.zip operator for this purpose, but I have no idea on how to apply it in this context since the actual number of questions is not predetermined. How can I change the code in line 1 so that I can improve the performance of the application. Any help is appreciated.

Ravindra Ranwala
  • 20,744
  • 6
  • 45
  • 63

1 Answers1

1

By default the observalbes in flatMap operate on the same scheduler as you subscribed it on. In order to run your createQuestion observables in parallel, you have to subscribe them on a computation scheduler.

questions.flatMapIterable(list -> list)
        .flatMap(q -> createQuestion(q).subscribeOn(Schedulers.computation()))
        .toList();

Check this article for a full explanation.

Lamorak
  • 10,957
  • 9
  • 43
  • 57
  • Thanks a lot for the response. I went through that article while trying the examples some of them were not working as expected. Anyway I have another question now. createQuestion is executed asynchronously. Previously it was invoked in a single thread. With the new modification it will be invoked in parallel threads. In both circumstances for each question there will be a separate thread to deal with the creation behind the covers. Will there be a huge gain due to this change? If so how? – Ravindra Ranwala Apr 13 '17 at 14:22
  • @RavindraRanwala So flatMap work as create a new Observable for each item it got and merge these observable into one. So if your item emit fast(which is your list) and your createQuestion works really slow. Then your createQuestion will work as "parallel". – Phoenix Wang Apr 13 '17 at 15:51
  • @PhoenixWang Thanks, I'll give it a try and keep you posted on the performance gain. It will take few days since I am away from my office. – Ravindra Ranwala Apr 13 '17 at 16:16
  • 2
    `Schedulers.computation()` is not appropriate here, as it will block computation threads on IO tasks. It should be `.flatMap(q -> createQuestion(q).subscribeOn(Schedulers.io()), N)` where N is the number of observables that `flatMap` will subscribe to at any one time. In effect this is the parallelism limit for this operation - the default of 128 is (IMHO) too big for most use cases. – Tassos Bassoukos Apr 13 '17 at 17:48
  • Thanks, I'll give it a try and let you know. It will take a while since I am away from the office. Appreciate if you can explain me how this improves performance of my app? – Ravindra Ranwala Apr 14 '17 at 03:13
  • What is the proper way to handle error which occurs inside createQuestion method? – Almighty Apr 10 '19 at 17:41
  • That's quite a broad question, maybe you can form it as a whole new question and provide more specifics to your case. – Lamorak Apr 11 '19 at 11:20