I'am a more or less newbie to akka and akka-http and having an issue with sending a http singleRequest from an actor.
What I'm trying to accomplish:
- actors calculate values upon receiving a calculation order
- as soon as calculation is done another actor, lets call him ReplyActor, is called to send a "done" http request to another system
- messages are getting queued, in my setup there is always just a single ReplyActor alive
Everything works just fine if I'm sending a single calculation order. If there are 10 calculation orders, at some point, depending on how much the system is slowed down with debug messages, the request is not being sent. No exceptions, no timeouts, noting.
My actor setup is designed similar to the distributed master worker example from the akka examples. While trying to figure out whats going wrong I'm running just a single worker (CalculationActor and ReplyActor).
Well, I've got some more details for you.
First of all, everything works fine too, if the request endpoint is written in akka http. Sadly, it is written in sparkjava, which relies on jetty. But as far as I can tell, it is not the fault of the endpoint. The request is not being sent.
At akka.http.impl.engine.client.PoolConductor#apply is a diagram of the command flow:
Request- +-----------+ +-----------+ Switch- +-------------+ +-----------+ Command
Context | retry | | slot- | Command | doubler | | route +-------------->
+--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->| (Flexi +-------------->
| | | | | | | Route) +-------------->
+----+------+ +-----+-----+ +-------------+ +-----------+ to slots
^ ^
| | SlotEvent
| +----+----+
| | flatten | mapAsync
| +----+----+
| | RawSlotEvent
| Request- |
| Context +---------+
+-------------+ retry |<-------- RawSlotEvent (from slotEventMerge)
| Split |
+---------+
Requests, which are not being sent, are missing the slot-Selector command. No idea why. I've already spent some time debugging. Maybe the following is of some sort of help:
Command-chain of successfully sent requests:
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 51
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 51
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 49
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (cass): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolConductor$SwitchSlotCommand)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolConductor$DispatchCommand
threadId: 49
akka.stream.actor.ActorPublisher#onNext
HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
threadId: 78
akka.stream.actor.ActorPublisher#onNext
HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.rendering.RequestRenderingContext
threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): scala.collection.immutable.$colon$colon
threadId: 78
akka.stream.actor.ActorPublisher#onNext
List(ResponseDelivery(ResponseContext(RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567), ...
threadId: 78
Command-chain of not sent request (same run):
akka.stream.actor.ActorPublisher#onNext
RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
threadId: 116
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
threadId: 116
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
out: MergePreferred.out,
next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
out: MergePreferred.out,
next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case _
handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
out: MergePreferred.out
next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
threadId: 78
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(0,0))
threadId: 117
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(0,0))
threadId: 117
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(1,0))
threadId: 49
I appreciate any help. Thanks!
Version is 2.4.8 (2.11) (akka-actor, akka-http-core, akka-http-experimental, akka-stream)