0

My Task is - get JWT token.(all details are here How to get jwt token value in spring webflux? (to exchange it with Minio STS token))

But lets discard redundant details. In an nutshell:

I have a source code:

@GetMapping(..)
public void someEndpoint(...) {
    Mono<Object> mono = ReactiveSecurityContextHolder.getContext()
        .map(securityContext -> securityContext.getAuthentication().getPrincipal());
    mono.block(); //<-- I need to get the result of Mono execution HERE at this thread in a blocking manner
     ...
}

And I get the error here:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-2

Because it is forbidden to use blocking calls in reactor and bla bla bla although in previous versions of reactor this code was working.

I started to looking for a solution of my issue and created 2 topics:

  1. How to get jwt token value in spring webflux? (to exchange it with Minio STS token)
  2. How to get raw token from ReactiveSecurityContextHolder?

I've got an advice to make blocking call in a way described here:

So my attempts are:

Attempt 1:

Mono<Object> mono = ReactiveSecurityContextHolder.getContext()
            .map(securityContext -> securityContext.getAuthentication().getPrincipal());
Mono<Object> objectMono = mono.subscribeOn(Schedulers.boundedElastic());
Object result = objectMono.block();

Attempt 2:

Mono<Object> mono = ReactiveSecurityContextHolder.getContext()
            .map(securityContext -> securityContext.getAuthentication().getPrincipal());
mono.subscribeOn(Schedulers.boundedElastic());
Object result = mono.block();

In both cases I receive the same error:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-2

How can I fix it ?

UPDATE 1:

Also I've found the similar topic with the same question but without any answer How do i extract information from Jwt which is stored in ReactiveSecurityContextHolder. It returns a Mono<String> but I need String

UPDATE 2:

All code snippets provided below lead to the same error ( block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-2 )

A)

Mono.just("qwerty")
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.block()

B)

Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = customMono
        .subscribeOn(Schedulers.boundedElastic())
        .publishOn(Schedulers.boundedElastic());
System.out.println("blockedMono.block(): " + blockedMono.block());

C)

 Mono<String> customMono = Mono.just("qwerty");
 Mono<String> blockedMono =  Mono.just(0)
            .subscribeOn(Schedulers.boundedElastic())
            .publishOn(Schedulers.boundedElastic())
            .then(customMono);

 System.out.println("blockedMono.block(): " + blockedMono.block());
gstackoverflow
  • 36,709
  • 117
  • 359
  • 710

1 Answers1

0

subscribeOn only applies to the subscribe event, i.e. when you first add your subscription. Events are still published on the default Scheduler, in this case 'parallel'.

To be able to make blocking calls you need to publishOn a non-blocking Scheduler, so for instance:

Mono<Object> mono = ReactiveSecurityContextHolder.getContext()
            .map(securityContext -> securityContext.getAuthentication().getPrincipal());
Mono<Object> objectMono = mono.subscribeOn(Schedulers.boundedElastic())
                              .publishOn(Schedulers.boundedElastic()); // <- this

Object result = mono.block();

should do the trick.

Edit:

In your case you should not call block directly within the method annotated with @GetMapping.

Consider doing this:

public MinioAsyncClient getOrCreateClientForPrincipal(Object Principal) {
    ...
}

@GetMapping(..)
public Mono<ResponseEntity<Boolean>> someEndpoint(...) {
    Mono<Boolean> mono = ReactiveSecurityContextHolder.getContext()
            .publishOn(Schedulers.boundedElastic()) //<-- this allows .block() somewhere downstream
            .subscribeOn(Schedulers.boundedElastic())
            .map(securityContext -> securityContext.getAuthentication().getPrincipal())
            .map(this::getOrCreateClientForPrincipal)
            .flatMap(asyncClient ->
                {
                    BucketExistsArgs args = ...;
                    try {
                        CompletableFuture<Boolean> existsQuery = asyncClient.bucketExists(args);
                        return Mono.fromFuture(existsQuery)
                             //allow Future to be cancelled if this Mono is cancelled
                             .doOnCancel(() -> existsQuery.cancel(true))
                             //unwrap CompletionException from the Future to its cause
                             .onErrorMap(CompletionException.class, e -> e.getCause())
                        ;
                        } catch(InsufficientDataException
                                | InternalException
                                | InvalidKeyException
                                | IOException
                                | NoSuchAlgorithmException
                                | XmlParserException e) {
                            //We need to catch these only because they're declared by MinioAsyncClient.
                            //they aren't actually thrown
                            return Mono.error(e);
                    }
                }
            )
            //error handling, adapt as necessary
            .onErrorReturn(InsufficientDataException.class, ...)
            .onErrorReturn(InvalidKeyException.class, ...)
            .onErrorReturn(IOException.class, ...)
            .onErrorReturn(NoSuchAlgorithmException.class, ...)
            .onErrorReturn(XmlParserException.class, ...)
        ;
    
    return mono.map(ResponseEntity::ok);
    
}

Notes:

The method is declared as returning Mono, since you are in a reactive context anyway. This makes it easier to return appropriately after processing, instead of using void and pretending to have a 'normal' context.

if getOrCreateClientForPrincipal takes 'longer', you could consider making it return Mono<MinioAsyncclient> and calling it with .flatMap. Should you follow the suggestion in the other question of caching the client for each principal in order to avoid creation on each call, you can do it in this method. You will need to figure a way of cleaning up that cache, but that's beyond the scope of this answer.

Error handling: MinioAsyncClient declares a bunch of Exceptions that it won't actually throw. These Exceptions can certainly be thrown within the Future and would be wrapped in a CompletionException, which the above code unwraps back to its source. It then provides for returning a default Boolean value for each exception, but here you can use the various .onError... methods to handle, propagate, etc.

You may even want to move them out one more level and handle on the outermost mono to map directly to an appropriate response code.

Note how there is no call to block() - it shouldn't really be needed, so in fact this may very well work without the calls to publishOn or subscribeOn...

Thomas Timbul
  • 1,634
  • 6
  • 14
  • I copy pasted your code and result is the same as in my question: `block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1` I also tried to replace `Object result = mono.block();` with `Object result = objectMono .block();` but I still see the same error. How can I fix it ? – gstackoverflow Jan 09 '23 at 11:04
  • I've been reading your other question about MinIO - can you use `MinioAsyncClient` to remain within an async context? `MinioAsyncClient` methods return Futures, so those would be easy to wrap with `Mono.fromFuture`. – Thomas Timbul Jan 09 '23 at 11:38
  • yes, I can switch to `MinioAsyncClient`. If it could fix my issue - please provide details because I don't understand how could it help for now. – gstackoverflow Jan 09 '23 at 11:50
  • To be honest it probably doesn't, because the CredentialsProvider is still not reactive. `block()` most likely still fails because itself is executed on one of the http threads. Can you share the code that surrounds your above block? – Thomas Timbul Jan 09 '23 at 12:17
  • 1. I end up with the same opinion about usage of `MinioAsyncClient`. 2. Maybe there is a way to execute Mono on a different thread. 3. Eventually I want to use this code inside `CredentialsProvider` implementation. For local testing I just wrote it inside controller method. – gstackoverflow Jan 09 '23 at 12:42
  • 4. Shared the code that surrounds my code block (updated topic) – gstackoverflow Jan 09 '23 at 12:49
  • I have updated my answer. It might be possible to avoid the `.block()` call completely... – Thomas Timbul Jan 09 '23 at 13:58
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/250969/discussion-between-thomas-timbul-and-gstackoverflow). – Thomas Timbul Jan 09 '23 at 13:59