Questions tagged [reactive-kafka]

Akka Streams Kafka, also known as Reactive Kafka, is an Akka Streams connector for Apache Kafka.

User guide: http://doc.akka.io/docs/akka-stream-kafka/current/home.html

Source code: https://github.com/akka/reactive-kafka

62 questions
13
votes
1 answer

Gracefully restart a Reactive-Kafka Consumer Stream on failure

Problem When I restart/complete/STOP stream the old Consumer does not Die/Shutdown: [INFO ] a.a.RepointableActorRef - Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] from Actor[akka://ufo-sightings/deadLetters] to…
Rabzu
  • 52
  • 5
  • 26
9
votes
1 answer

Implement Reactive Kafka Listener in Spring Boot application

I'm trying to implement reactive kafka consumer in my Spring boot application and I'm looking at these examples: https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleScenarios.java and…
7
votes
1 answer

How to ensure constant Avro schema generation and avoid the 'Too many schema objects created for x' exception?

I am experiencing a reproducible error while producing Avro messages with reactive kafka and avro4s. Once the identityMapCapacity of the client (CachedSchemaRegistryClient) is reached, serialization fails with java.lang.IllegalStateException: Too…
kostja
  • 60,521
  • 48
  • 179
  • 224
5
votes
1 answer

How to create multiple instances of KafkaReceiver in Spring Reactor Kafka

I have a reactive kafka application that reads data from a topic and writes to another topic. The topic has multiple partitions and I want to create the same number of consumers(in the same consumer group) as the partitions in the topic. From what I…
4
votes
1 answer

How to implement retry and recover logic with Spring Reactive Kafka

We are using the https://github.com/reactor/reactor-kafka project for implementing Spring Reactive Kafka. But we want to utilize Kafka retry and recover logic with reactive Kafka. Can anyone provide some sample code?
3
votes
1 answer

Reactor Kafka: ReactiveKafkaProducerTemplate

I've just started using Reactor Kafka. I am wondering when and why use ReactiveKafkaProducerTemplate over KafkaSender which is found in official reactor kafka reference guide?
user1955934
  • 3,185
  • 5
  • 42
  • 68
3
votes
1 answer

Limit the throughput of a Reactor Flux reading a Mongodb collection

I am using Spring 5, in detail the Reactor project, to read information from a huge Mongo collection to a Kafka topic. Unfortunately, the production of Kafka messages is much faster than the program that consumes them. So, I need to implement some…
riccardo.cardin
  • 7,971
  • 5
  • 57
  • 106
3
votes
1 answer

Akka Streams Reactive Kafka - OutOfMemoryError under high load

I am running an Akka Streams Reactive Kafka application which should be functional under heavy load. After running the application for around 10 minutes, the application goes down with an OutOfMemoryError. I tried to debug the heap dump and found…
dks551
  • 1,113
  • 1
  • 15
  • 39
3
votes
1 answer

Reactive-Kafka Stream Consumer: Dead letters occured

I am trying to consume messages from Kafka using akka's reactive kafka library. I am getting one message printed and after that I got [INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5]…
2
votes
1 answer

Multi threading on Kafka Send in Spring reactor Kafka

I have a reactive kafka application that reads data from a topic, transforms the message and writes to another topic. I have multiple partitions in the topic so I am creating multiple consumers to read from the topics in parallel. Each consumer runs…
2
votes
1 answer

Reactive Kafka : Exactly Once Processing with Transaction

Initially triggered via api call 1. Service A produces m1 to topic1 (non transactional send) 2. Service B consumes topic1 and does some processing (begin tx) 3. Service B produces m2 to topic2 (commit tx) 4. Service A consumes topic2 (begin…
user1955934
  • 3,185
  • 5
  • 42
  • 68
2
votes
1 answer

Reactor Kafka: Exactly Once Processing Sample

I've read many articles where there are many different configurations to achieve exactly once processing. Here is my producer config: final Map props = Maps.newConcurrentMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,…
user1955934
  • 3,185
  • 5
  • 42
  • 68
2
votes
1 answer

Zipkin Tracing for Project Reactor Kafka

I need to implement Zipkin tracing in one java-based service which is using Project Reactor Kafka for reactive streams and non-blocking IO operations. I could not find any brave instrumentation library which supports reactive-Kafka. The standard…
2
votes
1 answer

Combining arbitrary number of sources with materialized values

Given xs: Seq[Source[A, Mat]] and a function to combine individual materializers into a single one, is it possible to merge xs into a single aggregate source which materializes into an aggregate Mat? Consider this practical example: Having N Kafka…
tkroman
  • 4,811
  • 1
  • 26
  • 46
2
votes
1 answer

Akka Streams Kafka - unit test for consumer

I've tried to test my code locally by setting up a Kafka server and sending messages using a producer, but I am wondering if there is a way I can write a unit test for this piece of code (test whether the message received by the consumer is…
Isa
  • 353
  • 2
  • 18
1
2 3 4 5