5

Have a kafka cluster from which I consuming two topics and join it. With result of join I do some manipulation with database. All operations to DB is async, so they return me a Future (scala.concurrent.Future, but anyway its the same as java.util.concurrent.CompletableFuture). So as a result I got code like this:

val firstSource: KTable[String, Obj]
val secondSource: KTable[String, Obj2]

def enrich(data: ObjAndObj2): Future[EnrichedObj]
def saveResultToStorage(enrichedData: Future[EnrichedObj]): Future[Unit]

firstSource.leftJoin(secondSource, joinFunc)
           .mapValues(enrich)
           .foreach(saveResultToStorage)

Is it okay that I manupulate with future values in stream or there are better ways how to handle async tasks (like .mapAsync in Akka streams)?

  • 2
    Possible duplicate of [External system queries during Kafka Stream processing](http://stackoverflow.com/questions/42064430/external-system-queries-during-kafka-stream-processing) – Matthias J. Sax Feb 15 '17 at 17:55

1 Answers1

7

I have this same issue. From what I can tell, Kafka Streams is not designed to handle multi-rate streaming the same way Akka Streams is. Kafka Streams has no equivalent of the multi-rate primitives Akka has like mapAsync, throttle, conflate, buffer, batch, etc. Kafka Streams is good at handling joins between topics and stateful aggregations of data. Akka Streams is good at multi-rate and asynchronous processing.

You have a couple options how to handle this:

  • Make a blocking call in the Kafka Streams app. This is the easiest, and is fine if the throughput of your Future calls is not much greater than their latency. Kafka Streams uses separate threads per partition, so you can use the partitioning of the Kafka topic(s) being processed to drive parallelism.
  • Handle the enrichment in Akka Streams using the Reactive Kafka library, publish the enriched result to another Kafka Topic which you then bring into your Kafka Streams application. This is what we do for cases where the async call has a much faster parallel throughput than end-to-end latency such as a web service call or a query to a NoSQL database.
  • Publish all your enrichment data to its own KTable and join it in the Kafka Streams app. In fact, joining stream data with enrichment data via KTables is what Kafka Streams is good at. We use this if the enrichment data can be represented as a table. It does not work if the enrichment data must be computed on the fly.
Charles Crain
  • 91
  • 1
  • 6