3

I'm using akka with a microservice framework, so I've got a lot of completionStage requests. I want to get a list of elements from one microservice and zip them together with a single element from another such that I end up with a Source of Pair<list item, single element>.

I can't do this with a normal zip because Source.zip completes as soon as one of the two sources completes so I only end up with one element propagated.

I can't use Source.zipAll because that requires me to define the default element ahead of time.

If I already had the single element ahead of time I could use Source.repeat to make it repeatedly propagate that one element, meaning that the Source.zip would instead complete when the list of elements completed, but Source.repeat can't take a completion stage or a Source.completionStage.

My current tactic is to zip the things together before I mapConcat the list elements.

Source<singleElement> singleElement = Source.completionStage(oneService.getSingleElement().invoke());

return Source.completionStage(anotherService.getListOfElements().invoke)
    .zip(singleElement)
    .flatMapConcat(pair -> Source.fromIterator(() -> pair.first().stream().map(listElement -> Pair.create(listElement, pair.second())));

This eventually gets to what I want, but I feel like there's a lot of unnecessary duplication and moving around of data synchronously. Is there a better way to solve this problem that I'm missing?

Simon
  • 69
  • 1
  • 6

2 Answers2

4

The flatMapConcat operator should allow you to construct a Source.repeat which repeats the single element once it's known. In Scala (Source.future being the Scala equivalent of Source.completionStage: I'm not familiar enough with Java lambda syntax to answer in Java):

val singleElement = Source.future(oneService.getSingleElement)

Source.future(anotherService.getListOfElements)
  .mapConcat(lst => lst)  // unspool the list
  .zip(singleElement.flatMapConcat(element => Source.repeat(element)))
Levi Ramsey
  • 18,884
  • 1
  • 16
  • 30
1

Why don't you combine the CompletionStages and then feed them to Akka streams?

Source<Pair<String,String>, ?> execute() {
    CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);

    return Source.completionStage(pairCompletionStage)
        .flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
}

A full PoC - play with the sleep timeouts to complete the one or the other CompletionStage first:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

public class CompletionStages {
    CompletionStage<String> getSingleElement() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "Single Element";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        });
    }

    CompletionStage<List<String>> getListOfElements() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                return Arrays.asList("One", "Two", "Three");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        });
    }

    Source<Pair<String,String>, ?> execute() {
        CompletionStage<Pair<String, List<String>>> pairCompletionStage = getSingleElement().thenCombine(getListOfElements(), Pair::create);

        return Source.completionStage(pairCompletionStage)
                .flatMapConcat(pair -> Source.from(pair.second()).map(listElement -> Pair.create(listElement, pair.first())));
    }

    CompletionStage<Done> run(ActorSystem system) {
        return execute().runWith(Sink.foreach(System.out::println), system);
    }

    public static void main(String... args) {
        final ActorSystem system = ActorSystem.create();
        new CompletionStages().run(system)
                .thenRun(system::terminate);
    }
}
Nikos Paraskevopoulos
  • 39,514
  • 12
  • 85
  • 90