I am working with Kafka Parallel Consumer to consume and process messages, Now I would also like to produce new events to kafka topic. This is actually working with the ParallelStreamProcessor
. But I am failing to make it work with ReactorProcessor
Here is the code that is working for me:
pConsumer = createPConsumer()
pConsumer.subscribe(UniLists.of(kafkaConsumerConfig.kafkaTopic))
pConsumer.pollAndProduceMany ({ something ->
val records = something.stream().toList()
records.map { any ->
println("Consuming ${any.partition()}:${any.offset()}")
ProducerRecord<String, JsonObject>("output", any.key(),
JsonObject(mapOf("someTest" to any.offset())))
}
}, { consumeProduceResult ->
println(
"Message ${consumeProduceResult.getOut()} saved to broker at offset " +
"${consumeProduceResult.getMeta().offset()}"
)
})
private fun createPConsumer(): ParallelStreamProcessor<String, JsonObject> {
val producer = KafkaProducerBuilder.getProducer(kafkaConsumerConfig)
val options = ParallelConsumerOptions.builder<String, JsonObject>()
.ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
.maxConcurrency(parallelConsumerConfig.maxConcurrency)
.batchSize(parallelConsumerConfig.batchSize)
.consumer(buildConsumer(kafkaConsumerConfig))
.producer(producer)
.build()
return ParallelStreamProcessor.createEosStreamProcessor(options)
}
Expected this to send events, but it does not:
pConsumer.react { context ->
val events = context.stream().toList()
// do something with events
val results = events.map{any -> ProducerRecord<String, JsonObject>("output", any.key(),
JsonObject(mapOf("someTest" to any.offset())))}
Mono.just(results)
}
Will appreciate any advice