I am using Java and Akka actors, but need to call into a legacy Java API that looks like the following:
void legacyCall(OutputStream stdout, InputStream stdin, int timeout)
Calling this API blocks until the correct input is received or throws an exception if the timeout is reached.
What I want to do is send a newline into stdin every two seconds until I observe a certain output from stdout. In plain Java, this could be done with a few threads and PipedInputStream
/PipedOutputStream
, but since my application is using Akka actors, I would like to let Akka manage this task rather than creating a huge blocking actor.
So I am trying to use Akka streams to accomplish this. My understanding so far is that I will need at least two sources, one for reading stdout and one containing my newlines, and I will have at least one sink which is stdin.
I have cobbled together the following (non-working) code:
Source<ByteString, OutputStream> stdout = StreamConverters.asOutputStream();
Sink<ByteString, InputStream> stdin = StreamConverters.asInputStream();
stdout
.toMat(stdin, Keep.right())
.mapMaterializedValue(is ->
try {
// how to materialize os (OutputStream) here as well?
legacyCall(os, is, timeoutSeconds);
} catch (TimeoutException e) {
// how to handle this
}
);
Source<String, NotUsed> newlines = Source.repeat("\n")
.throttle(1, Duration.create(2, TimeUnit.SECONDS), 1, ThrottleMode.shaping());
I think there are at least a few things that I am missing:
- How do I materialize both the
OutputStream
andInputStream
streams so that I have access to these streams in a single place to pass intolegacyCall
? It seems like the API only supports materializing one value at a time. - How do I include my
Source
of new lines without materializing my java.io streams multiple times? - Somewhere I will need to be able parse stdout and signal when I receive the format of output that I am expecting. I am leaning toward the idea that I need two Akka streams, one to pump new lines into stdin and one to parse stdout and somehow "stop" the first stream as needed.
I am not sure if these are even the correct questions to be asking. Any suggestions are appreciated.