Is it possible to have concurrent subscribers using cyclops-react library? For example, if a run the following code:
ReactiveSeq<Integer> initialStream = ReactiveSeq.of(1, 2, 3, 4, 5, 6);
ReactiveSubscriber<Integer> sub1 = Spouts.reactiveSubscriber();
ReactiveSubscriber<Integer> sub2 = Spouts.reactiveSubscriber();
FutureStream<Integer> futureStream = FutureStream.builder().fromStream(initialStream)
.map(v -> v -1);
futureStream.subscribe(sub1);
futureStream.subscribe(sub2);
CompletableFuture future1 = CompletableFuture.runAsync(() -> sub1.reactiveStream().forEach(v -> System.out.println("1 -> " + v)));
CompletableFuture future2 = CompletableFuture.runAsync(() -> sub2.reactiveStream().forEach(v -> System.out.println("2 -> " + v)));
try {
future1.get();
future2.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
I get the following result:
1 -> 0
2 -> 0
2 -> 1
1 -> 0
1 -> 1
1 -> 1
2 -> 2
2 -> 3
2 -> 4
2 -> 5
1 -> 2
1 -> 2
1 -> 3
1 -> 4
1 -> 5
1 -> 3
1 -> 4
1 -> 5
I'm getting repeated values on the subscribers streams. Thank's in advance for any help.