1

Is it possible to have concurrent subscribers using cyclops-react library? For example, if a run the following code:

    ReactiveSeq<Integer> initialStream = ReactiveSeq.of(1, 2, 3, 4, 5, 6);

    ReactiveSubscriber<Integer> sub1 = Spouts.reactiveSubscriber();
    ReactiveSubscriber<Integer> sub2 = Spouts.reactiveSubscriber();

    FutureStream<Integer> futureStream = FutureStream.builder().fromStream(initialStream)
            .map(v -> v -1);

    futureStream.subscribe(sub1);
    futureStream.subscribe(sub2);

    CompletableFuture future1 = CompletableFuture.runAsync(() -> sub1.reactiveStream().forEach(v -> System.out.println("1 -> " + v)));
    CompletableFuture future2 = CompletableFuture.runAsync(() -> sub2.reactiveStream().forEach(v -> System.out.println("2 -> " + v)));

    try {
        future1.get();
        future2.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

I get the following result:

1 -> 0 
2 -> 0
2 -> 1
1 -> 0
1 -> 1
1 -> 1
2 -> 2
2 -> 3
2 -> 4
2 -> 5
1 -> 2
1 -> 2
1 -> 3
1 -> 4
1 -> 5
1 -> 3
1 -> 4
1 -> 5

I'm getting repeated values on the subscribers streams. Thank's in advance for any help.

Pedro Alipio
  • 117
  • 1
  • 8

1 Answers1

0

cyclops-react only supports single subscribers. I think the behaviour here should be changed to ignore the second subscription attempt rather than allow it to mess up both (I will log a bug - thank you!).

You may be able to use Topics to the same effect however. We can rewrite your example using Topics

ReactiveSeq<Integer> initialStream = ReactiveSeq.of(1,2,3,4,5,6);



        FutureStream<Integer> futureStream = FutureStream.builder()
                                                         .fromStream(initialStream)
                                                         .map(v -> v -1);
        Queue<Integer> queue= QueueFactories.<Integer>boundedNonBlockingQueue(1000).build();
        Topic<Integer> topic = new Topic<Integer>(queue,QueueFactories.<Integer>boundedNonBlockingQueue(1000));

        ReactiveSeq<Integer> s2 = topic.stream();
        ReactiveSeq<Integer> s1 = topic.stream();

        Thread t = new Thread(()->{
            topic.fromStream(futureStream);
            topic.close();
        });
        t.start();


        CompletableFuture future1 = CompletableFuture.runAsync(() -> s1.forEach(v -> System.out.println("1 -> " + v)));
        CompletableFuture future2 = CompletableFuture.runAsync(() -> s2.forEach(v -> System.out.println("2 -> " + v)));

        try {

            future1.get();
            future2.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

And the output is more inline with what we might expect

2 -> 0 1 -> 0 2 -> 1 1 -> 1 2 -> 2 1 -> 2 2 -> 3 1 -> 3 2 -> 4 1 -> 4 2 -> 5 1 -> 5

John McClean
  • 5,225
  • 1
  • 22
  • 30