0

I need to execute a task which includes four steps. Each step relies on the result of previous one. Executing all steps in one thread needs long time. I want to use four threads and each one performs one step and use a buffer between neighboring two steps to store the result of the previous step. I am developing on Android platform using Java. Can anybody give me an example?

Thanks a lot.

YL

EnJ
  • 175
  • 1
  • 9
  • 4
    If each thread relies on the result of the previous thread, then you CANNOT run that in parallel and you might as well run them sequentially in one thread. When you say heavy, is it causing a "to much work on main thread" error in android? – SteelToe Oct 18 '19 at 06:04
  • If each step depends on the previous one, is it possible to do faster? – Thorbjørn Ravn Andersen Oct 18 '19 at 06:06
  • No idea about the android enviroment, but in SE there are [reactive streams](https://www.baeldung.com/java-9-reactive-streams) which serve the purpose. – Curiosa Globunznik Oct 18 '19 at 06:21
  • @curiosa they don't allow magical parallelism though, they just reduce overhead (managing the buffers mentioned in the question). – Kayaman Oct 18 '19 at 06:55
  • Magical parallelism...now I got it. I tried to read the question in a way that makes sense rather than a contradiction. No magic here, you'd need more than one item passing through the parallel processors, perhaps [parallel streams](https://dzone.com/articles/parallel-and-asynchronous-programming-in-java-8) are worth taking a look at, here's another link, [Optimize stream pipelines for parallel processing](https://developer.ibm.com/articles/j-java-streams-5-brian-goetz/), probing the same approach. – Curiosa Globunznik Oct 18 '19 at 10:19

1 Answers1

0

I was curious, how the (non magically parallel) code would look like using reactive streams in java9. Turned out, the Java9 infrastructure is fragmentary and to be used with caution. So I chose to base the example on JavaRx, providing "Flows". There's an android extension available for that.

I put it in perspective with Streams, parallelStreams and sequential flows.

public class FlowStream {

@Test
public void flowStream() {
    int items = 10;

    List<Integer> source = IntStream.range(0, items - 1).boxed().collect(Collectors.toList());

    print("\nstream");
    source.stream().map(this::exp).map(this::exp).forEach(i -> print("streamed %d", i));

    print("\nparallelStream");
    source.parallelStream().map(this::exp).map(this::exp).forEach(i -> print("streamed %d parallel", i));

    print("\nflow");
    Flowable.range(0, items)
            .map(this::exp)
            .map(this::exp)
            .forEach(i -> print("flowed %d", i));

    print("\nparallel flow");
    Flowable.range(0, items)
            .flatMap(v ->
                    Flowable.just(v)
                            .subscribeOn(Schedulers.computation())
                            .map(this::exp)
            )
            .flatMap(v ->
                    Flowable.just(v)
                            .subscribeOn(Schedulers.computation())
                            .map(this::exp)
            ).forEach(i -> print("flowed parallel %d", i));

    await(5000);

}

private Integer exp(Integer i) {
    print("making %d more expensive", i);
    await(Math.round(10f / (Math.abs(i) + 1)) * 50);
    return i;
}

private void await(int i) {
    try {
        Thread.sleep(i);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

private void print(String pattern, Object... values) {
    System.out.println(String.format(pattern, values));
}

}

    <!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.2.13</version>
    </dependency>
Curiosa Globunznik
  • 3,129
  • 1
  • 16
  • 24