0

I have an Actor that sends a HTTP POST request using httpRequest => http.singleRequest(httpRequest).pipeTo(self)in one case BidRequest message. The the actor receives back the httpResponse in another case HttpResponse message. In this second case HttpResponse message I want to change a variable which the first case BidRequest message will send back. Because the messages are handled asynchronously, when I edit the variable on the second message, the first message already send back the variable with the old state.

I think I need to use the akka.pattern.ask in some way to not let the message arrive on another case HttpResponse, but stay in the same case BidRequest so I can edit the variable in place.

object AuctionClientActor {
  def props(bidders: List[String]) = { Props(new AuctionClientActor(bidders)) }
}

class AuctionClientActor(bidders: List[String])
  extends Actor with ActorLogging
    with BidJsonProtocol with SprayJsonSupport {

  import context.dispatcher

  implicit val system = context.system
  val http = Http(system)

  var bidOffer: BidOffer = BidOffer("", 0, "")

  def receive = {
    case bidRequest@BidRequest(requestId, bid) =>
      val content = bidRequest.bid.toJson.toString

      val latch = new CountDownLatch(bidders.size)

      val listResponseFuture: List[Future[HttpResponse]] = bidders
        .map(bidder =>
          HttpRequest( // create the request
            HttpMethods.POST,
            uri = Uri(bidder), // uri = Uri("http://localhost:8081"),
            entity = HttpEntity(ContentTypes.`application/json`, content)
          )
        )
        // IF I USE pipeTo HERE THE HttpResponse WILL GO TO ANOTHER CASE
        .map(httpRequest => http.singleRequest(httpRequest).pipeTo(self)) // send the request

      listResponseFuture.foreach { response =>
        Await.result(response, 3 seconds)
        response.onComplete {
          case Success(value) => latch.countDown // println(s"response success: $value")
          case Failure(exception) =>
            println(s"response failure: $exception")
            latch.countDown
        }
      }
      latch.await(3, TimeUnit.SECONDS)
      println("sending response now... BUT bidOffer WAS EDITED IN ANOTHER case thread")
      sender() ! Some(bidOffer.content)
      bidOffer = BidOffer("", 0, "")
    case resp@HttpResponse(StatusCodes.OK, headers, entity, _) =>
      log.info(s"received HttpResponse OK(200): $resp")
      entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
        println("Got response, body: " + body.utf8String)
        val newBidOffer = BidOfferConverter.getBidOffer(body.utf8String)
        // I SHOULD NOT EDIT bidOffer HERE. INSTEAD I NEED TO EDIT bidOffer ON THE case BidRequest
        if (bidOffer.bid == 0) {
          println("new")
          bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
        } else if (newBidOffer.bid > bidOffer.bid) {
          println("replace new")
          bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
        } else {
          println("none")
        }
      }
    case resp@HttpResponse(code, _, _, _) =>
      log.info(s"Request failed, response code: $code")
      resp.discardEntityBytes()
  }
}

I was looking at this answer to transform a List[Future] to Future[List], but when I do that I create a Future[List[Any]] and not a HttpResponse anymore.

Next code piece: So I tried to do the way you said but I am creating a List[Future[Future[String]]]. If I have only one host to do the request it is easy. But because I can have 1, 2, or 3 requests I create a list and the code get complicated. Plus the runFold from akka-stream creates another Future. Could you give a hint how to implement it in the way that you said?

      val responseListFuture: List[Future[Future[String]]] = bidders.map { bidder =>
        HttpRequest( // create the request
          HttpMethods.POST,
          uri = Uri(bidder), // uri = Uri("http://localhost:8081 | 8082 | 8083"),
          entity = HttpEntity(ContentTypes.`application/json`, content)
        )
      }
        .map { httpRequest =>
          http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future
            .map { httpResponse =>
              println(s"response: $httpResponse")
              // this creates the second Future
              httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
                println("Got response, body: " + body.utf8String)
                // BidOfferConverter.getBidOffer(body.utf8String)
                body.utf8String
              }
            }
        }
Felipe
  • 7,013
  • 8
  • 44
  • 102

2 Answers2

1

The short answer is that you can't, short of blocking in a receive, which is a major no-no.

This has the feeling of an X:Y question. What are the actual goals here? Is it just that you don't want the response sent until all the requests have completed?

If that's what you want, then the approach to take is to map a future to transform it to a message which includes the information you need to build a response. With doing this, you may not even need the bidOffer variable.

Future.sequence will collapse a Seq[Future[A]] (among other collection types) into a Future[Seq[A]] (failing if any of the futures fail: that may not be what you're looking for, in which case other combinators in the Future companion object might be more what you're looking for).

Levi Ramsey
  • 18,884
  • 1
  • 16
  • 30
  • `Is it just that you don't want the response sent until all the requests have completed?` exactly. Ok. I will try the approach with a `map`. Thanks. – Felipe Feb 21 '21 at 11:48
  • By the way, do you know any link at the akka docs that has such example with http request and map? The link that I referenced in the question does not use `map`. – Felipe Feb 21 '21 at 11:51
  • 1
    I'm not sure there's a doc example, partly because especially of late, the Akka team doesn't want to blindly encourage closing over mutable state. So save `sender` in a `val` before before mapping and a defensive copy of anything mutable that you mention in the `map` will be called for. – Levi Ramsey Feb 21 '21 at 13:01
  • 1
    It's also possible to carry state (e.g. an address to send the next reply) as part of the `Receive` (alias for `PartialFunction[Any, Unit]`) and changing the active `Receive` via `context.become`. This is especially well-suited to an actor with a life-cycle that's only for a single request, but it's a somewhat bigger structural change to your actor. Of note: that approach is effectively what Akka Typed is under the covers. – Levi Ramsey Feb 21 '21 at 13:07
  • 1
    indeed, only saving the `sender` in a `val` works :). I will build in this way, then I will try to improve using stateless and changing the `Receive` via `context.become`. thanks – Felipe Feb 21 '21 at 17:16
  • Hi @Levi, if you could take a look on my code because I cannot take the values from the Futures if I am doing only `map`. If you have some hint on how to implement this in a better way I appreciate. – Felipe Feb 21 '21 at 18:42
0

I got to put it running. I am also using Try, Option, getOrElse in case if some server is down. So I still send a HttpResponse back. I will leave the answer here just for completeness. If someone has a better approach to do I am happy to rethink it.

class AuctionClientActor(bidders: List[String])
  extends Actor with ActorLogging
    with BidJsonProtocol with SprayJsonSupport {

  import context.dispatcher

  implicit val system = context.system
  val http = Http(system)

  def receive = {
    case bidRequest@BidRequest(requestId, bid) =>
      log.info(s"received bid request: $bidRequest")
      val content = bidRequest.bid.toJson.toString
        .replace("[[", "{")
        .replace("]]", "}")
        .replace("\",\"", "\": \"")
        .replace("[", "")
        .replace("]", "")

      val responseListFuture = bidders.map { bidder =>
        HttpRequest( // create the request
          HttpMethods.POST,
          uri = Uri(bidder),
          entity = HttpEntity(ContentTypes.`application/json`, content)
        )
      }
        .map { httpRequest =>
          val httpResponseFuture = http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future[HttpResponse]
          Await.ready(httpResponseFuture, 5 seconds)
          httpResponseFuture.value.get.getOrElse(HttpResponse(StatusCodes.NotFound))
        }.filter(httpResponse => httpResponse.status == StatusCodes.OK)
        .map { httpResponse =>
          println(s"response: $httpResponse")
          val bidOfferFuture = httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
            println("Got response, body: " + body.utf8String)
            BidOfferConverter.getBidOffer(body.utf8String)
          }
          Await.ready(bidOfferFuture, 5 seconds)
          bidOfferFuture.value.get.getOrElse(BidOffer("", 0, ""))
        }
      responseListFuture.foreach { bidOffer =>
        println(s"bidOffer: ${bidOffer.id}, ${bidOffer.bid}, ${bidOffer.content}")
      }
      val bidOfferWinner = responseListFuture.maxBy(_.bid)
      println(s"winner: $bidOfferWinner")
      sender() ! Some(bidOfferWinner.content)
  }
}
Felipe
  • 7,013
  • 8
  • 44
  • 102
  • You're mapping on the list of futures, not the individual futures. I'd probably do something like `Future.sequence(responseListFuture)` to get a `Future[List[HttpResponse]]` and then map over that future for a message to pipe. – Levi Ramsey Feb 21 '21 at 22:15