akka-kafka is an actor based kafka consumer built on top of the high-level kafka consumer that allows for asynchronous/concurrent processing of messages from kafka while keeping an upper bound on the number of in-flight messages, and explicitly managing the commit of offsets.
Questions tagged [akka-kafka]
27 questions
4
votes
0 answers
error handling in Akka Kafka Producer
I am using reactive-kafka-core 0.10.1 (targeting Kafka 0.9.x). It looks like Kafka producer actor is stopped whenever an error is encountered from the callback function. Is there any way to customize this behavior? Our use case is to try to recover…

user1373186
- 101
- 4
3
votes
1 answer
Getting last message from kafka Topic using akka-stream-kafka when connecting with websocket
Is it at all possible to get the last message on a Kafka Topic using Akka Streams Kafka? I'm creating a websocket which listens to a Kafka Topic, but currently it retrieves all prior unred messages when I connecting. This can add up to quite a lot…

Martin
- 352
- 3
- 15
3
votes
2 answers
What is a good pattern for committing Kafka consumer offset after processing message?
I am using Akka Streams Kafka to pipe Kafka messages to a remote service. I want to guarantee that the service receives every message exactly once (at-least-once AND at-most-once delivery).
Here's the code I came up with:
private def…

jacob
- 2,762
- 1
- 20
- 49
3
votes
1 answer
Reactive-Kafka Stream Consumer: Dead letters occured
I am trying to consume messages from Kafka using akka's reactive kafka library. I am getting one message printed and after that I got
[INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5]…

anshul_cached
- 684
- 5
- 18
2
votes
1 answer
Why does auto-commit enabled Kafka client commit latest produced message's offset during consumer close even if the message was not consumed yet?
TLDR:
Is committing produced message's offset as consumed (even if it wasn't) expected behavior for auto-commit enabled Kafka clients? (for the applications that consuming and producing the same topic)
Detailed explanation:
I have a simple scala…

kadir
- 589
- 1
- 7
- 29
2
votes
1 answer
Akka Kafka Consumer processing rate decreases drastically when lag is there in our Kafka Partitions
We are facing a scenario where our akka-stream-kaka-consumer processing rate is decreasing whenever there is a lag. When we start it without any lag in partitions, processing rate increases suddenly.
MSK cluster - 10 topics - 40 partitions each =>…

Abhinandan Sanduja
- 21
- 4
1
vote
2 answers
Is this Akka Kafka Stream configuration benefits from Back Pressure mechanism of the Akka Streams?
we have an Akka Application that consume from an Kafka Topic and send the received Message to an Akka Actor. I am not sure that way I programmed used the all benefits of the Back Pressure mechanism built into Akka Streams.
Following is my…

posthumecaver
- 1,584
- 4
- 16
- 29
1
vote
0 answers
How Akka stream internal and explicit buffer interact with the underlying kafka client settings in alpakka Kafka?
I am trying to use Akka stream buffer to improve the throughput of my stream, I'm wondering how it does apply to Kafka
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
in particular,
val kafkaSource =
…

MaatDeamon
- 9,532
- 9
- 60
- 127
1
vote
1 answer
Troubles with AVRO schema update
I have a simple case class:
case class User(id: String, login: String, key: String)
i am add field "name"
case class User(id: String, login: String, name: String, key: String)
then add this field in avro schema (user.avsc)
{
"namespace":…

HoTicE
- 573
- 2
- 13
1
vote
1 answer
Incompatible equality constraint while using Akka Kafka Streams
I am trying to use Akka Kafka Streams following the Akka Kafka Streams documentation. Here is the code I have:
ConsumerSettings consumerSettings = ConsumerSettings
.create(actorSystem, new…

Prasanth
- 1,005
- 5
- 19
- 39
1
vote
1 answer
Akka streams kafka commit offset after filter
I am trying to have an At-least-once commit strategy for offset in akka streams, and I am not able to understand what is the expected pattern for the cases I use filter on my stream.
my expectations is that none of the filtered messages will get…

Kanekotic
- 2,824
- 3
- 21
- 35
1
vote
2 answers
Akka Kafka Producersettings : overloaded method value apply with alternatives:
I am running to a problem again and again when i put producer settings in my code. When I dont have it everything works fine. Below gave the file single file it contains all the code, I am trying to write a file to a kafka stream. And getting this…

tnkteja
- 11
- 2
1
vote
3 answers
Akka Stream TCP + Akka Stream Kafka producer not stopping not publishing messages and not error-ing out
I have the following stream:
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new…

Darien
- 414
- 2
- 15
1
vote
0 answers
Chain Akka Streams Kafka with Akka-http by Source[Bytestring]
I'm receiving a file as a Bytestring from Kafka Reactive Streams consumer, which I want to send to a service.
Is there a away to extract Source[Bytestring, Any] from Kafka Reactive Stream Consumer, so that I can chain the stream from Kafka to…

Rabzu
- 52
- 5
- 26
0
votes
1 answer
How to group messages so that they go to the same partition every time?
We are using java, Akka framework and azure event hub. We are using akka framework's groupedWeightedWithin to group messages based on the total number of bytes or duration. I don't know how to set the partition key so that a particular device's…

Venky
- 177
- 3
- 13