0

I run my Netty based Kotlin application with Spring Boot and WebFlux. The details are as follows:

  • Java 11;
  • Kotlin 1.3.61;
  • Spring Boot 2.2.5.RELEASE;
  • Spring Vault Core 2.2.2.RELEASE.

I get a file on the web layer. WebFlux creates a Part (org.springframework.http.codec.multipart) out of it. The data is stored in the Project Reactor's Flux in the Part as a stream of DataBuffer chunks of size 4Kb:

Flux<DataBuffer> content();

Due to compliance with the consistency of frameworks, I transform the Flux to a Kotlin's Flow.

Then I use a synchronous Vault client's encrypt(...) submitting the chunks asynchronously (as far as I understand) within the flatMapMerge method (note encrypt(...) is not suspend and it is a wrapper on top of an HTTP client to a remote encryption provider):

public String encrypt(String keyName, String plaintext);

I have checked this answer https://stackoverflow.com/a/58659423/6612401 and found out that the Flow-Based Approach should be used with flow { emit(...)}.

My question is can I use this Flow-Based Approach with not suspend functions? Or is there a better approach, considering I am using runBlocking(Dispatchers.IO) and a suspend fold(...) function.

The code is as follows:

@FlowPreview
@ExperimentalCoroutinesApi
private fun getOpenByteArrayAndEncryptText(part: Part): Pair<ByteArray, String> = runBlocking(Dispatchers.IO) {
    val pair = part.content().asFlow()
            .flatMapMerge { dataBuffer ->
                val openByteArray = dataBuffer.asInputStream().readBytes()
                val opentextBase64 = Base64Utils.encodeToString(openByteArray)
                flow { emit(Pair(openByteArray,  vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64))) }
            }.fold(Pair(ByteArrayOutputStream(), StringBuilder())) { result, curPair ->
                result.first.writeBytes(curPair.first)
                result.second.append(curPair.second)
                result
            }
    Pair(pair.first.toByteArray(), pair.second.toString())
}

P.S. The fold(...) function collects open chunks to a ByteArrayOutputStream to calculate a hash later and it collects encrypted chunks to a StringBuilder as the result of encrypting the file.

P.P.S. I have tried my approach. The method submits 5-7 parallel requests on average on my Core i5 8gen 4 physical cores machine. It does its job but not that fast. Having Vault being deployed not locally I get roughly 1 second per 1 Mb of encryption. I understand that it depends on the latency of the network. I don't even consider the speed of encryption on the side of the Vault, it is lightning fast due to the size of chunks which is 4Kb only. Are there any methods to increase the speed concurrency-wise?

P.P.P.S I have tried playing with concurrency = MAX_CONCURRENT_REQUESTS in flatMapMerge{...}. Nothing significant in results so far. It's even better leaving it default.

Evgeny Mamaev
  • 1,237
  • 1
  • 14
  • 31
  • I don't see any IO in this code, it's CPU-bound. If you want the best performance for parallelizable CPU-bound work, use Java Streams. Kotlin Flows are designed to support suspendable stream transformations, but you don't use those. – Marko Topolnik May 20 '20 at 11:44
  • @MarkoTopolnik Vault is not a pluggable library. Encryption goes on a remote server, Vault here is only an HTTP client to Vault. `vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64)` is an HTTP client call. – Evgeny Mamaev May 20 '20 at 12:08
  • So your goal is to increase the number of concurrent requests? You could try playing with `concurrency = MAX_CONCURRENT_REQUESTS` on `flatMapMerge`, but if Vault is as fast at encrypting as you say, maybe the limiting factor is somewhere else. – Marko Topolnik May 20 '20 at 12:22
  • Yes, exactly, what I'm trying is increasing the number of concurrent requests. I have tried playing with `concurrency = MAX_CONCURRENT_REQUESTS`. Nothing significant in results so far. It's even better leaving it default. – Evgeny Mamaev May 20 '20 at 12:31
  • By the way, is it idiomatic to make a `suspend` function out of a not `suspend` one? With the `suspend {...}` keyword for instance. – Evgeny Mamaev May 20 '20 at 12:42
  • 2
    There is no point in ever doing that. You just add overheads without gaining anything. – Marko Topolnik May 20 '20 at 12:56

0 Answers0