1

While working with Spring Webflux, I'm trying to insert some data in the realm object server which interacts with Java apps via a Rest API. So basically I have a set of students, who have a set of subjects and my objective is to persist those subjects in a non-blocking manner. So I use a microservice exposed via a rest endpoint which provides me with a Flux of student roll numbers, and for that flux, I use another microservice exposed via a rest endpoint that gets me the Flux of subjects, and for each of these subjects, I want to persist them in the realm server via another rest endpoint. I wanted to make this all very nonblocking which is why I wanted my code to look like this.

void foo() {
studentService.getAllRollnumbers().flatMap(rollnumber -> {
    return subjectDirectory.getAllSubjects().map(subject -> {
        return dbService.addSubject(subject);
    })
});

}

But this doesn't work for some reason. But once I call blocks on the things, they get into place, something like this.

Flux<Done> foo() {
    List<Integer> rollNumbers = studentService.getAllRollnumbers().collectList().block();

    rollNumbers.forEach(rollNumber -> {
        List<Subject> subjects = subjectDirectory.getAllSubjects().collectList().block();

    subjects.forEach(subject -> {dbService.addSubject(subject).block();});
    });

    return Flux.just(new NotUsed());
}

getAllRollnumbers() returns a flux of integers.
getAllSubjects() returns a flux of subject.
and addSubject() returns a Mono of DBResponse pojo.

What I can understand is that the thread executing this function is getting expired before much of it gets triggerred. Please help me work this code in an async non blocking manner.

ayush prashar
  • 436
  • 5
  • 13

1 Answers1

0

You are not subscribing to the Publisher at all in the first instance that is why it is not executing. You can do this:

studentService.getAllRollnumbers().flatMap(rollnumber -> {
    return subjectDirectory.getAllSubjects().map(subject -> {
        return dbService.addSubject(subject);
    })
}).subscribe();

However it is usually better to let the framework take care of the subscription, but without seeing the rest of the code I can't advise.

McGin
  • 1,361
  • 2
  • 13
  • 31
  • this happens to be a scheduled process so all what happens here is, this function is a part of a configuration class,that has scheduling enabled. No other code! – ayush prashar Mar 29 '19 at 03:14