2

I am using Java and Akka actors, but need to call into a legacy Java API that looks like the following:

void legacyCall(OutputStream stdout, InputStream stdin, int timeout)

Calling this API blocks until the correct input is received or throws an exception if the timeout is reached.

What I want to do is send a newline into stdin every two seconds until I observe a certain output from stdout. In plain Java, this could be done with a few threads and PipedInputStream/PipedOutputStream, but since my application is using Akka actors, I would like to let Akka manage this task rather than creating a huge blocking actor.

So I am trying to use Akka streams to accomplish this. My understanding so far is that I will need at least two sources, one for reading stdout and one containing my newlines, and I will have at least one sink which is stdin.

I have cobbled together the following (non-working) code:

Source<ByteString, OutputStream> stdout = StreamConverters.asOutputStream();
Sink<ByteString, InputStream> stdin = StreamConverters.asInputStream();
stdout
  .toMat(stdin, Keep.right())
  .mapMaterializedValue(is ->
    try {
      // how to materialize os (OutputStream) here as well?
      legacyCall(os, is, timeoutSeconds);
    } catch (TimeoutException e) {
      // how to handle this
    }
  );

Source<String, NotUsed> newlines = Source.repeat("\n")
  .throttle(1, Duration.create(2, TimeUnit.SECONDS), 1, ThrottleMode.shaping());

I think there are at least a few things that I am missing:

  1. How do I materialize both the OutputStream and InputStream streams so that I have access to these streams in a single place to pass into legacyCall? It seems like the API only supports materializing one value at a time.
  2. How do I include my Source of new lines without materializing my java.io streams multiple times?
  3. Somewhere I will need to be able parse stdout and signal when I receive the format of output that I am expecting. I am leaning toward the idea that I need two Akka streams, one to pump new lines into stdin and one to parse stdout and somehow "stop" the first stream as needed.

I am not sure if these are even the correct questions to be asking. Any suggestions are appreciated.

Stephen Booher
  • 6,522
  • 4
  • 34
  • 50
  • The answer to your first question is that you need to use `Keep.both` for `is` variable in `mapMaterializedValue` to be a tuple of `InputStream` and `OutputStream`. You can find more explanation e.g. [here](https://stackoverflow.com/questions/39727729/akka-streams-what-does-mat-represents-in-sourceout-mat/39729078#39729078). – Vladimir Matveev Dec 02 '17 at 05:26
  • @Stephen Booher, did you find the right solution? – falcon Oct 08 '21 at 13:22

0 Answers0