I run my Netty based Kotlin application with Spring Boot and WebFlux. The details are as follows:
- Java 11;
- Kotlin 1.3.61;
- Spring Boot 2.2.5.RELEASE;
- Spring Vault Core 2.2.2.RELEASE.
I get a file on the web layer. WebFlux creates a Part
(org.springframework.http.codec.multipart
) out of it. The data is stored in the Project Reactor's Flux
in the Part
as a stream of DataBuffer
chunks of size 4Kb:
Flux<DataBuffer> content();
Due to compliance with the consistency of frameworks, I transform the Flux
to a Kotlin's Flow
.
Then I use a synchronous Vault client's encrypt(...)
submitting the chunks asynchronously (as far as I understand) within the flatMapMerge
method (note encrypt(...)
is not suspend
and it is a wrapper on top of an HTTP client to a remote encryption provider):
public String encrypt(String keyName, String plaintext);
I have checked this answer https://stackoverflow.com/a/58659423/6612401 and found out that the Flow-Based Approach should be used with flow { emit(...)}
.
My question is can I use this Flow-Based Approach with not suspend
functions? Or is there a better approach, considering I am using runBlocking(Dispatchers.IO)
and a suspend
fold(...)
function.
The code is as follows:
@FlowPreview
@ExperimentalCoroutinesApi
private fun getOpenByteArrayAndEncryptText(part: Part): Pair<ByteArray, String> = runBlocking(Dispatchers.IO) {
val pair = part.content().asFlow()
.flatMapMerge { dataBuffer ->
val openByteArray = dataBuffer.asInputStream().readBytes()
val opentextBase64 = Base64Utils.encodeToString(openByteArray)
flow { emit(Pair(openByteArray, vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64))) }
}.fold(Pair(ByteArrayOutputStream(), StringBuilder())) { result, curPair ->
result.first.writeBytes(curPair.first)
result.second.append(curPair.second)
result
}
Pair(pair.first.toByteArray(), pair.second.toString())
}
P.S. The fold(...)
function collects open chunks to a ByteArrayOutputStream
to calculate a hash later and it collects encrypted chunks to a StringBuilder
as the result of encrypting the file.
P.P.S. I have tried my approach. The method submits 5-7 parallel requests on average on my Core i5 8gen 4 physical cores machine. It does its job but not that fast. Having Vault being deployed not locally I get roughly 1 second per 1 Mb of encryption. I understand that it depends on the latency of the network. I don't even consider the speed of encryption on the side of the Vault, it is lightning fast due to the size of chunks which is 4Kb only. Are there any methods to increase the speed concurrency-wise?
P.P.P.S I have tried playing with concurrency = MAX_CONCURRENT_REQUESTS
in flatMapMerge{...}
. Nothing significant in results so far. It's even better leaving it default.