Have a kafka cluster from which I consuming two topics and join it. With result of join I do some manipulation with database. All operations to DB is async, so they return me a Future (scala.concurrent.Future, but anyway its the same as java.util.concurrent.CompletableFuture). So as a result I got code like this:
val firstSource: KTable[String, Obj]
val secondSource: KTable[String, Obj2]
def enrich(data: ObjAndObj2): Future[EnrichedObj]
def saveResultToStorage(enrichedData: Future[EnrichedObj]): Future[Unit]
firstSource.leftJoin(secondSource, joinFunc)
.mapValues(enrich)
.foreach(saveResultToStorage)
Is it okay that I manupulate with future values in stream or there are better ways how to handle async tasks (like .mapAsync in Akka streams)?