Using Akka 2.4.7. I would like to log the entire Http Response. Using an implementation similar to How does one log Akka HTTP client requests The code of concern is the one that extracts the data out of the HttpEntity
def entityAsString(entity: HttpEntity) (implicit m: Materializer, ex: ExecutionContext): Future[String] = {
entity.dataBytes.map(_.decodeString("UTF-8")).runWith(Sink.head)
}
This works well if the POST request has a small payload. But starting from 1K there is an exception:
java.lang.IllegalStateException: Substream Source cannot be materialized more than once
QUESTION: Why is this exception dependant on the size of the POST payload. And hopefully is there any possible fix?
Full log message:
2016-08-11 10:15:35,100 ERROR a.a.ActorSystemImpl [undefined]: Error during processing of request HttpRequest(HttpMethod(POST),http://localhost:3001/api/v2/exec,List(User-Agent: curl/7.30.0, Host: localhost:3001, Accept: */*, Expect: 100-continue, Timeout-Access: <function1>),HttpEntity.Default(multipart/form-data; boundary=-------------------------acebdf13572468; charset=UTF-8,5599,Source(SourceShape(StreamUtils$$anon$2.out), CompositeModule [2db5bfef]
Name: unnamed
Modules:
(unnamed) CompositeModule [4aac8b90]
Name: unnamed
Modules:
(SubSource%28EntitySource%29) GraphStage(EntitySource) [073d36ba]
(unnamed) [155dd7c9] copy of GraphStage(OneHundredContinueStage) [40b6c892]
(unnamed) [1b902132] copy of GraphStage(Collect) [75f65c1c]
(limitable) [76375468] copy of CompositeModule [59626a09]
Name: limitable
Modules:
(unnamed) GraphStage(unknown-operation) [1bee846d]
Downstreams:
Upstreams:
MatValue: Ignore
Downstreams:
SubSource.out -> GraphStage.in
GraphStage.out -> Collect.in
Collect.out -> unknown-operation.in
Upstreams:
GraphStage.in -> SubSource.out
Collect.in -> GraphStage.out
unknown-operation.in -> Collect.out
MatValue: Atomic(SubSource%28EntitySource%29[073d36ba])
(unnamed) [77d6c04c] copy of GraphStage(akka.http.impl.util.StreamUtils$$anon$2@30858cb0) [7e073049]
Downstreams:
SubSource.out -> GraphStage.in
GraphStage.out -> Collect.in
Collect.out -> unknown-operation.in
unknown-operation.out -> StreamUtils$$anon$2.in
Upstreams:
GraphStage.in -> SubSource.out
Collect.in -> GraphStage.out
unknown-operation.in -> Collect.out
StreamUtils$$anon$2.in -> unknown-operation.out
MatValue: Atomic(akka.stream.impl.StreamLayout$CompositeModule[4aac8b90]))),HttpProtocol(HTTP/1.1))
java.lang.IllegalStateException: Substream Source cannot be materialized more than once
at akka.stream.impl.fusing.SubSource$$anon$4.setCB(StreamOfStreams.scala:703)
at akka.stream.impl.fusing.SubSource$$anon$4.preStart(StreamOfStreams.scala:713)
at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:475)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:380)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:489)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.create(ActorCell.scala:590)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)