I have an Actor that sends a HTTP POST request using httpRequest => http.singleRequest(httpRequest).pipeTo(self)
in one case BidRequest
message. The the actor receives back the httpResponse in another case HttpResponse
message. In this second case HttpResponse
message I want to change a variable which the first case BidRequest
message will send back. Because the messages are handled asynchronously, when I edit the variable on the second message, the first message already send back the variable with the old state.
I think I need to use the akka.pattern.ask
in some way to not let the message arrive on another case HttpResponse
, but stay in the same case BidRequest
so I can edit the variable in place.
object AuctionClientActor {
def props(bidders: List[String]) = { Props(new AuctionClientActor(bidders)) }
}
class AuctionClientActor(bidders: List[String])
extends Actor with ActorLogging
with BidJsonProtocol with SprayJsonSupport {
import context.dispatcher
implicit val system = context.system
val http = Http(system)
var bidOffer: BidOffer = BidOffer("", 0, "")
def receive = {
case bidRequest@BidRequest(requestId, bid) =>
val content = bidRequest.bid.toJson.toString
val latch = new CountDownLatch(bidders.size)
val listResponseFuture: List[Future[HttpResponse]] = bidders
.map(bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder), // uri = Uri("http://localhost:8081"),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
)
// IF I USE pipeTo HERE THE HttpResponse WILL GO TO ANOTHER CASE
.map(httpRequest => http.singleRequest(httpRequest).pipeTo(self)) // send the request
listResponseFuture.foreach { response =>
Await.result(response, 3 seconds)
response.onComplete {
case Success(value) => latch.countDown // println(s"response success: $value")
case Failure(exception) =>
println(s"response failure: $exception")
latch.countDown
}
}
latch.await(3, TimeUnit.SECONDS)
println("sending response now... BUT bidOffer WAS EDITED IN ANOTHER case thread")
sender() ! Some(bidOffer.content)
bidOffer = BidOffer("", 0, "")
case resp@HttpResponse(StatusCodes.OK, headers, entity, _) =>
log.info(s"received HttpResponse OK(200): $resp")
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
println("Got response, body: " + body.utf8String)
val newBidOffer = BidOfferConverter.getBidOffer(body.utf8String)
// I SHOULD NOT EDIT bidOffer HERE. INSTEAD I NEED TO EDIT bidOffer ON THE case BidRequest
if (bidOffer.bid == 0) {
println("new")
bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
} else if (newBidOffer.bid > bidOffer.bid) {
println("replace new")
bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
} else {
println("none")
}
}
case resp@HttpResponse(code, _, _, _) =>
log.info(s"Request failed, response code: $code")
resp.discardEntityBytes()
}
}
I was looking at this answer to transform a List[Future]
to Future[List]
, but when I do that I create a Future[List[Any]]
and not a HttpResponse
anymore.
Next code piece: So I tried to do the way you said but I am creating a List[Future[Future[String]]]
. If I have only one host to do the request it is easy. But because I can have 1, 2, or 3 requests I create a list and the code get complicated. Plus the runFold
from akka-stream
creates another Future
. Could you give a hint how to implement it in the way that you said?
val responseListFuture: List[Future[Future[String]]] = bidders.map { bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder), // uri = Uri("http://localhost:8081 | 8082 | 8083"),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
}
.map { httpRequest =>
http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future
.map { httpResponse =>
println(s"response: $httpResponse")
// this creates the second Future
httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
println("Got response, body: " + body.utf8String)
// BidOfferConverter.getBidOffer(body.utf8String)
body.utf8String
}
}
}