0

I am working with Kafka Parallel Consumer to consume and process messages, Now I would also like to produce new events to kafka topic. This is actually working with the ParallelStreamProcessor. But I am failing to make it work with ReactorProcessor

Here is the code that is working for me:

    pConsumer = createPConsumer()
    pConsumer.subscribe(UniLists.of(kafkaConsumerConfig.kafkaTopic))
    pConsumer.pollAndProduceMany ({ something ->
        val records = something.stream().toList()
        records.map { any ->
            println("Consuming ${any.partition()}:${any.offset()}")
            ProducerRecord<String, JsonObject>("output", any.key(),
                JsonObject(mapOf("someTest" to any.offset())))
        }


    },  { consumeProduceResult ->
        println(
            "Message ${consumeProduceResult.getOut()} saved to broker at offset " +
                    "${consumeProduceResult.getMeta().offset()}"
        )
    })


private fun createPConsumer(): ParallelStreamProcessor<String, JsonObject> {
        val producer = KafkaProducerBuilder.getProducer(kafkaConsumerConfig)
        val options = ParallelConsumerOptions.builder<String, JsonObject>()
            .ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
            .maxConcurrency(parallelConsumerConfig.maxConcurrency)
            .batchSize(parallelConsumerConfig.batchSize)
            .consumer(buildConsumer(kafkaConsumerConfig))
            .producer(producer)
            .build()
        return ParallelStreamProcessor.createEosStreamProcessor(options)
    }

Expected this to send events, but it does not:

pConsumer.react { context ->            
        val events = context.stream().toList()
        // do something with events
        val results = events.map{any -> ProducerRecord<String, JsonObject>("output", any.key(),
            JsonObject(mapOf("someTest" to any.offset())))}
        Mono.just(results)
    }

Will appreciate any advice

Ehud Lev
  • 2,461
  • 26
  • 38
  • 1
    1) why pass "consumer config" to your producer builder? 2) Please clarify "does not work". You've only created records and returned a Mono (which should probably be a Flux, as it's a list of items), not actually sent anything – OneCricketeer Nov 14 '22 at 14:16
  • 1. I updated to not send events. As for mono vs flux, this is what the API asks for and also I see it in the library example: https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java – Ehud Lev Nov 14 '22 at 15:57
  • `pollAndProduce` seems very explicit about what it does. I don't see it documented anywhere that `react` is supposed to produce any records to Kafka. It may forward to a `Publisher`, but that is a ReactiveX concept? Not an explicit Kafka Producer. – OneCricketeer Nov 14 '22 at 20:38

1 Answers1

0

So, currently (version 0.5.2.4) it is not supported. see issue. we did implement it in the following way if it helps anyone:

    // Example usage
     parallelConsumer.react(context -> {
         var consumerRecord = context.getSingleRecord().getConsumerRecord();
         log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord);
         Map<String, String> params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value());
         Mono originalResult =  Mono.just(Arrays.asList(new ProducerRecord("topic", "key", "some value"));
         return originalResult.map(batchProducer::produce);
     });
    class BatchProducer<K, V> {
        Producer<K, V> producer;

        public BatchProducer(Producer<K, V> producer) {
            this.producer = producer;
        }

        public Mono<List<RecordMetadata>> produce(List<ProducerRecord<K, V>> messages) {
            List<CompletableFuture<RecordMetadata>> futures = messages.stream().map(message -> {
                CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<RecordMetadata>();
                Callback kafkaCallback = createCallback(completableFuture);
                producer.send(message, kafkaCallback);
                return completableFuture;
            }).toList();
            CompletableFuture<List<RecordMetadata>> oneResult = sequence(futures);
            return Mono.fromFuture(oneResult);
        }

        // From here: https://stackoverflow.com/questions/30025428/convert-from-listcompletablefuture-to-completablefuturelist
        static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
            return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
                    .thenApply(v -> com.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList())
                    );
        }

        private Callback createCallback(CompletableFuture<RecordMetadata> completableFuture) {
            return new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        completableFuture.completeExceptionally(exception);
                    } else {
                        completableFuture.complete(metadata);
                    }
                }

            };
        }

        public void close() {
            producer.close();
        }

    }

Ehud Lev
  • 2,461
  • 26
  • 38