Questions tagged [akka-kafka]

akka-kafka is an actor based kafka consumer built on top of the high-level kafka consumer that allows for asynchronous/concurrent processing of messages from kafka while keeping an upper bound on the number of in-flight messages, and explicitly managing the commit of offsets.

27 questions
4
votes
0 answers

error handling in Akka Kafka Producer

I am using reactive-kafka-core 0.10.1 (targeting Kafka 0.9.x). It looks like Kafka producer actor is stopped whenever an error is encountered from the callback function. Is there any way to customize this behavior? Our use case is to try to recover…
user1373186
  • 101
  • 4
3
votes
1 answer

Getting last message from kafka Topic using akka-stream-kafka when connecting with websocket

Is it at all possible to get the last message on a Kafka Topic using Akka Streams Kafka? I'm creating a websocket which listens to a Kafka Topic, but currently it retrieves all prior unred messages when I connecting. This can add up to quite a lot…
Martin
  • 352
  • 3
  • 15
3
votes
2 answers

What is a good pattern for committing Kafka consumer offset after processing message?

I am using Akka Streams Kafka to pipe Kafka messages to a remote service. I want to guarantee that the service receives every message exactly once (at-least-once AND at-most-once delivery). Here's the code I came up with: private def…
jacob
  • 2,762
  • 1
  • 20
  • 49
3
votes
1 answer

Reactive-Kafka Stream Consumer: Dead letters occured

I am trying to consume messages from Kafka using akka's reactive kafka library. I am getting one message printed and after that I got [INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5]…
2
votes
1 answer

Why does auto-commit enabled Kafka client commit latest produced message's offset during consumer close even if the message was not consumed yet?

TLDR: Is committing produced message's offset as consumed (even if it wasn't) expected behavior for auto-commit enabled Kafka clients? (for the applications that consuming and producing the same topic) Detailed explanation: I have a simple scala…
kadir
  • 589
  • 1
  • 7
  • 29
2
votes
1 answer

Akka Kafka Consumer processing rate decreases drastically when lag is there in our Kafka Partitions

We are facing a scenario where our akka-stream-kaka-consumer processing rate is decreasing whenever there is a lag. When we start it without any lag in partitions, processing rate increases suddenly. MSK cluster - 10 topics - 40 partitions each =>…
1
vote
2 answers

Is this Akka Kafka Stream configuration benefits from Back Pressure mechanism of the Akka Streams?

we have an Akka Application that consume from an Kafka Topic and send the received Message to an Akka Actor. I am not sure that way I programmed used the all benefits of the Back Pressure mechanism built into Akka Streams. Following is my…
posthumecaver
  • 1,584
  • 4
  • 16
  • 29
1
vote
0 answers

How Akka stream internal and explicit buffer interact with the underlying kafka client settings in alpakka Kafka?

I am trying to use Akka stream buffer to improve the throughput of my stream, I'm wondering how it does apply to Kafka Consumer.committableSource(consumerSettings, Subscriptions.topics(topic)) in particular, val kafkaSource = …
MaatDeamon
  • 9,532
  • 9
  • 60
  • 127
1
vote
1 answer

Troubles with AVRO schema update

I have a simple case class: case class User(id: String, login: String, key: String) i am add field "name" case class User(id: String, login: String, name: String, key: String) then add this field in avro schema (user.avsc) { "namespace":…
HoTicE
  • 573
  • 2
  • 13
1
vote
1 answer

Incompatible equality constraint while using Akka Kafka Streams

I am trying to use Akka Kafka Streams following the Akka Kafka Streams documentation. Here is the code I have: ConsumerSettings consumerSettings = ConsumerSettings .create(actorSystem, new…
Prasanth
  • 1,005
  • 5
  • 19
  • 39
1
vote
1 answer

Akka streams kafka commit offset after filter

I am trying to have an At-least-once commit strategy for offset in akka streams, and I am not able to understand what is the expected pattern for the cases I use filter on my stream. my expectations is that none of the filtered messages will get…
Kanekotic
  • 2,824
  • 3
  • 21
  • 35
1
vote
2 answers

Akka Kafka Producersettings : overloaded method value apply with alternatives:

I am running to a problem again and again when i put producer settings in my code. When I dont have it everything works fine. Below gave the file single file it contains all the code, I am trying to write a file to a kafka stream. And getting this…
tnkteja
  • 11
  • 2
1
vote
3 answers

Akka Stream TCP + Akka Stream Kafka producer not stopping not publishing messages and not error-ing out

I have the following stream: Source(IndexedSeq(ByteString.empty)) .via( Tcp().outgoingConnection(bsAddress, bsPort) .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true)) .map(_.utf8String) ) .map(m => new…
Darien
  • 414
  • 2
  • 15
1
vote
0 answers

Chain Akka Streams Kafka with Akka-http by Source[Bytestring]

I'm receiving a file as a Bytestring from Kafka Reactive Streams consumer, which I want to send to a service. Is there a away to extract Source[Bytestring, Any] from Kafka Reactive Stream Consumer, so that I can chain the stream from Kafka to…
Rabzu
  • 52
  • 5
  • 26
0
votes
1 answer

How to group messages so that they go to the same partition every time?

We are using java, Akka framework and azure event hub. We are using akka framework's groupedWeightedWithin to group messages based on the total number of bytes or duration. I don't know how to set the partition key so that a particular device's…
Venky
  • 177
  • 3
  • 13
1
2