Problem
When I restart/complete/STOP stream the old Consumer does not Die/Shutdown:
[INFO ] a.a.RepointableActorRef -
Message [akka.kafka.KafkaConsumerActor$Internal$Stop$]
from Actor[akka://ufo-sightings/deadLetters]
to…
I'm trying to implement reactive kafka consumer in my Spring boot application and I'm looking at these examples:
https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleScenarios.java
and…
I am experiencing a reproducible error while producing Avro messages with reactive kafka and avro4s. Once the identityMapCapacity of the client (CachedSchemaRegistryClient) is reached, serialization fails with
java.lang.IllegalStateException: Too…
I have a reactive kafka application that reads data from a topic and writes to another topic. The topic has multiple partitions and I want to create the same number of consumers(in the same consumer group) as the partitions in the topic. From what I…
We are using the https://github.com/reactor/reactor-kafka project for implementing Spring Reactive Kafka. But we want to utilize Kafka retry and recover logic with reactive Kafka.
Can anyone provide some sample code?
I've just started using Reactor Kafka. I am wondering when and why use ReactiveKafkaProducerTemplate over KafkaSender which is found in official reactor kafka reference guide?
I am using Spring 5, in detail the Reactor project, to read information from a huge Mongo collection to a Kafka topic. Unfortunately, the production of Kafka messages is much faster than the program that consumes them. So, I need to implement some…
I am running an Akka Streams Reactive Kafka application which should be functional under heavy load. After running the application for around 10 minutes, the application goes down with an OutOfMemoryError. I tried to debug the heap dump and found…
I am trying to consume messages from Kafka using akka's reactive kafka library. I am getting one message printed and after that I got
[INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5]…
I have a reactive kafka application that reads data from a topic, transforms the message and writes to another topic. I have multiple partitions in the topic so I am creating multiple consumers to read from the topics in parallel. Each consumer runs…
Initially triggered via api call
1. Service A produces m1 to topic1 (non transactional send)
2. Service B consumes topic1 and does some processing
(begin tx)
3. Service B produces m2 to topic2
(commit tx)
4. Service A consumes topic2
(begin…
I've read many articles where there are many different configurations to achieve exactly once processing.
Here is my producer config:
final Map props = Maps.newConcurrentMap();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,…
I need to implement Zipkin tracing in one java-based service which is using Project Reactor Kafka for reactive streams and non-blocking IO operations. I could not find any brave instrumentation library which supports reactive-Kafka.
The standard…
Given xs: Seq[Source[A, Mat]] and a function to combine individual materializers into a single one, is it possible to merge xs into a single aggregate source which materializes into an aggregate Mat?
Consider this practical example:
Having N Kafka…
I've tried to test my code locally by setting up a Kafka server and sending messages using a producer, but I am wondering if there is a way I can write a unit test for this piece of code (test whether the message received by the consumer is…