2

Similar to Spring Reactor: How to throw an exception when publisher emit a value?

I have a finder method in my DAO java findSomePojo which returns result SomePojo . The finder calls amazon db apis and the javasoftware.amazon.awssdk.services.dynamodb.model.GetItemResponse has output of call. So I am trying this hasElement() check in my service layer createSomePojo method. (Not sure if I am using it correctly- Iwas trying and debugging)

Basically : I want to check if there is already element, it is illegal to save and I would not call DAOs save. So I need to throw exception.

Assuming that there is already a record of SomePojo in DB, I try to invoke create_SomePjo of service .But I see in logs that filter is not working and is get NPE when reactor invokes createModel_SomePojo making me believe that somehow even after check filter it throws NPE

///service SomePjoService it has create_SomePojo, find_SomePojo etc

Mono<Void>  create_SomePojo(reqPojo){

// Before calling DAO 's save I call serivice find (which basically calls DAOs find (Shown befow after this methid)
       Mono<Boolean> monoPresent = find_SomePojo(accountId, contentIdExtn)
                 .filter(i -> i.getId() != null)
                 .hasElement();
       System.out.println("monoPresent="+monoPresent.toString());
       if(monoPresent.toString().equals("MonoHasElement")){
       //*************it comes here i see that***********//
           System.out.println("hrereee monoPresent="+monoPresent);
          // Mono<Error> monoCheck=
                return  monoPresent.handle((next, sink) -> sink.error(new SomeException(ITEM_ALREADY_EXISTS))).then();
       } else {
           return SomePojoRepo.save(reqPojo).then();
       }

}

Mono<SomePojo> find_SomePojo(id){
    return SomePojoRepo.find(id);
}

==============================================================

///DAO : SomePojoRepo.java : it has save,find,delete
Mono<SomePojo> find( String id) {
    Mono<SomePojo> fallback = Mono.empty();
    Mono<GetItemResponse> monoFilteredResponse = monoFuture
        .filter(getItemResponse -> getItemResponse.item().size() > 0&& getItemResponse!=null);
    Mono<SomePojo> result = monoFilteredResponse
        .map(getItemResponse -> createModel_SomePojo(getItemResponse.item()));

    Mono<SomePojo> deferedResult = Mono.defer(() -> result.switchIfEmpty(fallback));
        return deferedResult;
}

I see there is hasElement() method on Mono . Not sure how to correctly use it. I can achieve exception if I call DAO save in my service create_SomePojo(reqPojo) directly without doing all this findner check because primary key constraint will take care and throw excpetion and I cna rethrow and then catch in service but what If I want to check in service and throw exception with error codes . The idea is not to pass response error object to dao layer .

mslowiak
  • 1,688
  • 12
  • 25
Vivek Misra
  • 165
  • 2
  • 15
  • Are you sure that problem is not connected with condition in filter lambda expression? Expression is evaluated from left to right. Imho you should check getItemResponse != null firstly – mslowiak Jul 31 '19 at 21:03
  • added some details – Vivek Misra Jul 31 '19 at 21:11
  • change filter condition to: getItemResponse -> getItemResponse!=null && getItemResponse.item().size() > 0 – mslowiak Jul 31 '19 at 21:20
  • OK .That fixed the filter condition. But somehow hasElement() is not working now. I thought it was working but now even for new request it goes to first if condition rather than directly saving. SO now I see my custom exception always :). Guess Iam not properly using hasElement or whats the proper way to handle ..? – Vivek Misra Jul 31 '19 at 21:31
  • create_SomePojo() and createModel_SomePojo() are the same in this code? – mslowiak Jul 31 '19 at 21:32
  • createModel_SomePojo() is just a Pojo utiliy mapper based on amazon dynamo db mapping. It is in DAO.It basically extracts GetitemResponse by apping dynamo db columns to SomePojo. You can call it maptoPojo() for clarity DAO just have save/find/delete Service have create_SomePojo,find_SomePojo signatures – Vivek Misra Jul 31 '19 at 21:40

1 Answers1

3

Try to use Hooks.onOperatorDebug() hook to get better debugging experience.

Correct way to use hasElement (assuming that find_SomePojo never returns null)

Mono<Boolean> monoPresent =  find_SomePojo(accountId, contentIdExtn)
        .filter(i -> i.getId() != null)
        .hasElement();

return monoPresent.flatMap(isPresent -> {
    if(isPresent){
        Mono.error(new SomeException(ITEM_ALREADY_EXISTS)));
    }else{
        SomePojoRepo.save(reqPojo);
    }
}).then();

Sidenote

There is a common misconception about what Mono actually is. It does not hold any data - it's just a fragment of pipeline, which transmits signals and data flowing through it. Therefore, line System.out.println("monoPresent="+monoPresent.toString()); makes no sense, because it just prints the hasElements() decorator around the existsing pipeline. Internal name of this decorator is MonoHasElement, no matter what is contained in it(true /false), MonoHasElement would be printed anyway.

Correct ways to print signal (and data transmitted along with them) are: Mono.log(), Mono.doOnEach/next(System.out::println) or System.out.println("monoPresent="+monoPresent.block());. Beware of third one: it will block whole thread until data is emitted, so use it only if you know what you are doing.

Example with Monos printing to play with:

   Mono<String> abc = Mono.just("abc").delayElement(Duration.ofSeconds(99999999));

    System.out.println(abc); //this will print MonoDelayElement instantly
    System.out.println(abc.block()); //this will print 'abc', if you are patient enough ;^)
    abc.subscribe(System.out::println); //this will also print 'abc' after 99999999 seconds, but without blocking current thread
arap
  • 499
  • 2
  • 6