5

I have read Akka streams materialization concept, and understand stream materialization is:

the process of taking a stream description (the graph) and allocating all the necessary resources it needs in order to run.

I followed one example to build my akka stream using mapMaterializedValue to send message to queue. The purpose of the code is to push the message to queue after the stream blue print have build and the code is working but I do not really understand what does mapMaterrializaedValue do in the code:

Promise<SourceQueueWithComplete<String>> promise = new Promise.DefaultPromise<>();

Source<String, SourceQueueWithComplete<String>> s = Source
    .queue(100, OverflowStrategy.fail())
    .mapMaterializaedValue(queue -> {
        promise.trySuccess(queue);
    });

source.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left()).run(materIalizer);

promise.<SourceQueueWithComplete<String>>future().map(mapMapperFunction(), actorSystem.dispatcher());
zt1983811
  • 1,011
  • 3
  • 14
  • 34

1 Answers1

6

The purpose of mapMaterializedValue is to transform the materialized value immediately after it is materialized. For example, suppose you have a third-party library which accepts a callback like this:

interface Callback<T> {
    void onNext(T next);
    void onError(Throwable t);
    void onComplete();
}

Then you can create a method which returns a Source<T, Callback<T>> whose materialized value you can immediately pass to that third-party library when the stream is actually run:

<T> Source<T, Callback<T>> callbackSource() {
    return Source.queue(1024, OverflowStrategy.fail())
        .mapMaterializedValue(queue -> new Callback<T> {
            // an implementation of Callback which pushes the data
            // to the queue
        });
}

Source<Integer, Callback<Integer>> source = callbackSource();

Callback<Integer> callback = source
    .toMat(Sink.foreach(System.out::println), Keep.left())
    .run(materializer);

thirdPartyApiObject.runSomethingWithCallback(callback);

You can see here that this can simplify the code that must use such a kind of a third-party API because you do this queue -> callback transformation only once and encapsulate it in a method.

In your case, however, you don't really need it. You're using mapMaterializedValue to complete an external promise with the materialized value, which is completely unnecessary since you can just use the materialized value after its materialization directly:

Source<String, SourceQueueWithComplete<String>> s = Source
    .queue(100, OverflowStrategy.fail());

SourceQueueWithComplete<String> queue = source
    .toMat(Sink.foreach(x -> System.out.println(x)), Keep.left())
    .run(materIalizer);

mapMapperFunction().apply(queue);
Vladimir Matveev
  • 120,085
  • 34
  • 287
  • 296
  • 1
    Thank you Vladimir, very clear explaination of mapMaterializedValue , now I understand how it works. One more question about materialized value, is this like Future ? – zt1983811 Jun 14 '17 at 15:03
  • 2
    No; materialized values neither are required to be futures (e.g. the queue in these examples is not returned as a future, although there are cases when mat. value is a future), nor are they "similar" to futures - the only such similarity is the substring `map` in the `mapMaterializedValue` and `Future.map` method names, which is because such transformation is almost always called `map`. You can find more about materialized values in my answer [here](https://stackoverflow.com/questions/39727729/akka-streams-what-does-mat-represents-in-sourceout-mat/39729078#39729078). – Vladimir Matveev Jun 14 '17 at 19:25
  • Thanks for the answer I will take a look of that post. – zt1983811 Jun 14 '17 at 19:48