1

I'am a more or less newbie to akka and akka-http and having an issue with sending a http singleRequest from an actor.

What I'm trying to accomplish:

  • actors calculate values upon receiving a calculation order
  • as soon as calculation is done another actor, lets call him ReplyActor, is called to send a "done" http request to another system
  • messages are getting queued, in my setup there is always just a single ReplyActor alive

Everything works just fine if I'm sending a single calculation order. If there are 10 calculation orders, at some point, depending on how much the system is slowed down with debug messages, the request is not being sent. No exceptions, no timeouts, noting.

My actor setup is designed similar to the distributed master worker example from the akka examples. While trying to figure out whats going wrong I'm running just a single worker (CalculationActor and ReplyActor).

Well, I've got some more details for you.

First of all, everything works fine too, if the request endpoint is written in akka http. Sadly, it is written in sparkjava, which relies on jetty. But as far as I can tell, it is not the fault of the endpoint. The request is not being sent.

At akka.http.impl.engine.client.PoolConductor#apply is a diagram of the command flow:

Request-   +-----------+     +-----------+    Switch-    +-------------+     +-----------+    Command
Context    |   retry   |     |   slot-   |    Command    |   doubler   |     |   route   +-------------->
+--------->|   Merge   +---->| Selector  +-------------->| (MapConcat) +---->|  (Flexi   +-------------->
           |           |     |           |               |             |     |   Route)  +-------------->
           +----+------+     +-----+-----+               +-------------+     +-----------+       to slots
                ^                  ^
                |                  | SlotEvent
                |             +----+----+
                |             | flatten | mapAsync
                |             +----+----+
                |                  | RawSlotEvent
                | Request-         |
                | Context     +---------+
                +-------------+  retry  |<-------- RawSlotEvent (from slotEventMerge)
                              |  Split  |
                              +---------+

Requests, which are not being sent, are missing the slot-Selector command. No idea why. I've already spent some time debugging. Maybe the following is of some sort of help:

Command-chain of successfully sent requests:

akka.stream.actor.ActorPublisher#onNext
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
    threadId: 51
akka.stream.actor.ActorPublisher#onNext
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
    threadId: 51
akka.stream.stage.GraphStageLogic#grab (fast path)
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
    threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
    threadId: 49
akka.stream.stage.GraphStageLogic#grab (fast path)
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
    threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (cass): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolConductor$SwitchSlotCommand)
    threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolConductor$DispatchCommand
    threadId: 49
akka.stream.actor.ActorPublisher#onNext
    HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
    threadId: 78
akka.stream.actor.ActorPublisher#onNext
    HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
    threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.rendering.RequestRenderingContext
    threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (class): akka.stream.Outlet
    element (class): scala.collection.immutable.$colon$colon
    threadId: 78
akka.stream.actor.ActorPublisher#onNext
    List(ResponseDelivery(ResponseContext(RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567), ...
    threadId: 78

Command-chain of not sent request (same run):

akka.stream.actor.ActorPublisher#onNext
    RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
    threadId: 116
akka.stream.actor.ActorPublisher#onNext
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
    threadId: 116
akka.stream.stage.GraphStageLogic#grab (fast path)
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
    threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
    threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
    threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
    handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
    out: MergePreferred.out,
    next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
    threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
    handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
    out: MergePreferred.out,
    next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
    threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case _
    handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
    out: MergePreferred.out
    next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
    threadId: 78
akka.stream.actor.ActorPublisher#onNext
    element: List(Disconnected(0,0))
    threadId: 117
akka.stream.actor.ActorPublisher#onNext
    element: List(Disconnected(0,0))
    threadId: 117
akka.stream.actor.ActorPublisher#onNext
    element: List(Disconnected(1,0))
    threadId: 49

I appreciate any help. Thanks!

Version is 2.4.8 (2.11) (akka-actor, akka-http-core, akka-http-experimental, akka-stream)

stphngrtz
  • 201
  • 3
  • 13

1 Answers1

0

Got it! Looks like I was doing my requests "wrong".

Fails after ~4 requests:

Http http = Http.get(context().system());
ActorMaterializer materializer = ActorMaterializer.create(context().system());
HttpRequest request = HttpRequestPOST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message);

http.singleRequest(request, materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t));

Works:

Http http = Http.get(context().system());
ActorMaterializer materializer = ActorMaterializer.create(context().system());
Flow<HttpRequest, HttpResponse, CompletionStage<OutgoingConnection>> flow = http.outgoingConnection(ConnectHttp.toHost("localhost", 8091));
HttpRequest request = HttpRequest.POST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message);

Source.single(request).via(flow).runWith(Sink.head(), materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t));

Many thanks to this question (and answer), which pointed me into the right direction.

Community
  • 1
  • 1
stphngrtz
  • 201
  • 3
  • 13