1

we have an Akka Application that consume from an Kafka Topic and send the received Message to an Akka Actor. I am not sure that way I programmed used the all benefits of the Back Pressure mechanism built into Akka Streams.

Following is my configuration...

val control : Consumer.DrainingControl[Done]
Consumer
 .sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
 .map(consumerRecord =>
     val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
     
     val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
     
     myActor ! Update(myAvro)          
 )
 .via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
 .toMat(Sink.ignore)(Consumer.DrainControl.apply)
 .run()

This does what I expect as my Business Case, myActor receive the Commands Update(MyAvro)

I am more irritated with the technical concepts of Back Pressure, as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.

What I am also curious when Akka Kafka Stream commit the Kafka Topic offset? The moment the Command delivered to Mailbox of MyActor? If so then how I can handle scenarios like ask patterns, Kafka Offset should not commit until ask pattern completes.

I see some Factory Methods dealing with manual offset control 'plainPartitionedManualOffsetSource', 'commitablePartitionManualOffsetSource' but I can't find any example for those, can I decide with my business logic to manually commit the offsets?

As an alternative Configuration, I can use something like this.

val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
  Consumer
    .plainSource(consumerSettings, Subscriptions.topics("myTopic"))
    .toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
    .run()

Now I have an access to Sink.actorRef, I think Back Pressure mechanism has a chance control Back Pressure, Naturally this code will not work because I have no idea how can I access 'myAvro' under this constellation.

Thx for answers..

posthumecaver
  • 1,584
  • 4
  • 16
  • 29

2 Answers2

1

In the first stream, there will be basically no backpressure. The offset commit will happen very soon after the message gets sent to myActor.

For backpressure, you'll want to wait for a response from the target actor, and as you say, the ask pattern is the canonical way to accomplish that. Since an ask of an actor from outside an actor (which for all intents and purposes a stream is outside of an actor: that the stages are executed by actors is an implementation detail) results in a Future, this suggests that mapAsync is called for.

def askUpdate(m: MyAvro): Future[Response] = ???  // get actorref from cluster sharding, send ask, etc.

You would then replace the map in your original stream with

.mapAsync(parallelism) { consumerRecord =>
  askUpdate(consumeRecord.value().asInstanceOf[MyAvro])
}

mapAsync limits the "in-flight" futures to parallelism. If there are parallelism futures (spawned by it, of course), it will backpressure. If a spawned future completes with a failure (for the ask itself, this will generally be a timeout), it will fail; the results (respecting incoming order) of the successful futures will be passed on (very often, these will be akka.Done, especially when the only things left to do in the stream are offset commit and Sink.ignore).

Levi Ramsey
  • 18,884
  • 1
  • 16
  • 30
  • First of thank you the answer, if I understand correctly, if 'parallelism' is not greater then 1, then there will be again no backpressure mechanism. So the next question will be, what must be the parallelism? classic 'number of core * 2'? Which will be most probably the same as the Dispatcher Thread Pool size? And how this interacts with the partitions of Kafka, which is the way in Kafka to create parallelism. So if I use plainPartitionedManualOffsetSource instead of 'sourceWitOffsetContext' and by having 4 Kafka Partitions und parallelism 1, I can stlll get backpressuring? – posthumecaver Dec 04 '20 at 07:43
  • If parallelism is 1, there will still be backpressure for the period of time between when the ask is sent and the response received. – Levi Ramsey Dec 04 '20 at 15:13
  • A partitioned source and mapAsync(1) will still backpressure (note that the consumer still batches reads from Kafka for efficiency and will still background poll if backpressured in order to signal liveness to Kafka). That said, I've never really used the partitioned sources. – Levi Ramsey Dec 04 '20 at 15:18
  • The parallelism parameter in a "read-from-Kafka-send-asks-to-actors" stream really just controls the threshold at which you're backpressuring. Thus, as with dispatcher pool sizes, it depends on the particulars of the workload. – Levi Ramsey Dec 04 '20 at 15:24
  • For instance, you're doing asks to cluster sharded actors, so a major parallelism consideration is the distribution of IDs in a particular partition of the stream (if there are hotspots, that will generally decrease the safe parallelism level; this can be ameliorated with some adaptive throttling if you want high parallelism with backpressure only in a hotspot). – Levi Ramsey Dec 04 '20 at 15:37
  • For parallelism, the most general advice I can give is to limit it to number of threads in the dispatcher(s) running the actors divided by number of streams sending asks. If clustering, the number of threads is a sum across the instances in the cluster; if the stream from Kafka is partitioned, the number of streams is the number of partitions (if not, it'd be something like the lesser of number of partitions and number of instances running a stream from that topic) – Levi Ramsey Dec 04 '20 at 15:49
  • For dispatcher size, there are a lot of answers on SO covering that, but the general advice would be: if the actors are performing blocking I/O, the dispatcher can be pretty large; if they're not, keep the dispatcher no larger than the number of cores/hyperthreads. – Levi Ramsey Dec 04 '20 at 15:50
0

This statement is not correct:

... as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.

There's nothing special about Sinks for backpressure. A backpressure as a flow control mechanism will be automatically used anywhere there's asynchronous boundary in the stream. That MAY be in Sink but it may as well be anywhere else in the stream.

In your case you're hooking up your stream to talk to an actor. That's your asynchronous boundary, but the way you do it is using map and inside that map u use ! to talk to an actor. So there is no backpressure, because:

  1. map is not an asynchronous operator and nothing called inside it can participate in backpressure mechanism. So from the Akka Stream perspective there is NO async boundary introduced.
  2. ! is fire and forget, there's no feedback provided as to how busy the actor is to enforce any backpressure.

Like Levi mentioned, what you can do is to change from tell to ask interaction and make the receiving actor respond when its work is done. Then you can use mapAsync like Levi describes. The difference here between map and mapAsync is that semantics of mapAsync are such that it will emit downstream only when returned Future completes. Even if parallelism is 1 the backpressure still works. In case your Kafka records come way faster than your actor can handle, mapAsync will backpressure upstream when it waits for Future completion. In this particular case I think increasing parallelism makes no sense as all those messages will be added to the actor's inbox, so you won't really speed anything up by doing this. If the interaction was say a REST call then it could improve the overall throughput. Depending on how your actor processes the messages, increasing parallelism for mapAsync may result in increased throughput. paralleslism value effectively caps the max number of not completed Futures allowed before backpressure kicks in.

artur
  • 1,710
  • 1
  • 13
  • 28
  • I understand what you mean with '.map(_ => msg.committableOffset)' but I think (I am not sure), 'sourceWitOffsetContext' transfer offsets through the flow to be committed at 'Commiter.flowWithOffsetContext' if the Future completes with Success. – posthumecaver Dec 04 '20 at 12:53
  • I see, that makes sense. I will edit the answer. Also having thought about parallelism, on a second though I think you can still get some throughput benefit if your actor in turn calls some external system, so it's no that it doesn't always make sense to have parallelism > 1 when you send messages to actor from the stream via `mapAsync` – artur Dec 04 '20 at 15:03
  • 1
    @artur, note also the use of cluster sharding (presumably with multiple IDs in the stream). In that case, there's not really a meaningful relationship between the number of messages in any actor's mailbox and the parallelism, which would control the number of asks in-flight to all actors from the stream. – Levi Ramsey Dec 04 '20 at 15:40
  • Yep @LeviRamsey. Thx. I missed that. Assumed it was the same instace of actor. I edited the answer. – artur Dec 04 '20 at 17:11