0

I am trying to use cyclops-react to batch the elements from a queue, based on size, but also on time, so it doesn't block when there are no elements

Maybe the functionality is not what I expected or I am doing something wrong

The complete code (Groovy) is like this with the producer in another thread:

            Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    new Thread({
        while (true) {
            sleep(1000)
            queue.offer("New message " + System.currentTimeMillis());
        }
    }).start();

    StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
            .groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
            .forEach({i->println(i + " Batch Time: ${System.currentTimeMillis()}")})

The output is:

    [New message 1487673650332,  Batch Time: 1487673651356]
    [New message 1487673651348, New message 1487673652352,  Batch Time: 1487673653356]
    [New message 1487673653355, New message 1487673654357,  Batch Time: 1487673655362]
    [New message 1487673655362, New message 1487673656364,  Batch Time: 1487673657365]

But I was expecting one element in each batch since the delay between elements offered is 10seconds but the batching is every half a second

Also I tried with an asynchronous stream (Groovy code):

    Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
            .async()
            .groupedBySizeAndTime(10, 500,TimeUnit.MILLISECONDS)
            .peek({i->println(i + "Batch Time: ${System.currentTimeMillis()}")}).run();

    while (true) {
        queue.offer("New message " + System.currentTimeMillis());
        sleep(1000)
    }

Again, it only batches every 2 seconds, sometimes waiting for two elements per batch, even if the timeout in the batch is half second:

    [New message 1487673877780, Batch Time: 1487673878819]
    [New message 1487673878811, New message 1487673879812, Batch Time: 1487673880815]
    [New message 1487673880814, New message 1487673881819, Batch Time: 1487673882823]
    [New message 1487673882823, New message 1487673883824, Batch Time: 1487673884828]
    [New message 1487673884828, New message 1487673885831, Batch Time: 1487673886835]

I did a third experiment with a non future non lazy stream, and this time it worked.

    Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    new Thread({
        while (true) {
            sleep(1000)
            queue.offer("New message " + System.currentTimeMillis());
        }
    }).start();

    queue.stream()
            .groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
            .forEach({i->println(i + " Batch Time " + System.currentTimeMillis())})

Result:

    [New message 1487673288017, New message 1487673289027,  Batch Time , 1487673289055]
    [New message 1487673290029,  Batch Time , 1487673290029]
    [New message 1487673291033,  Batch Time , 1487673291033]
    [New message 1487673292037,  Batch Time , 1487673292037]

Why the behaviour of the batching seems to be wrong when you use a future stream?

John McClean
  • 5,225
  • 1
  • 22
  • 30

1 Answers1

0

The differential behaviour is due to a bug that reduces the efficiency of grouping FutureStreams of an async.Queue (basically this means that next result is present within the 500ms limit of the previous and the Stream will ask the Queue for another value and wait until it arrives). This will be fixed in future releases of cyclops-react.

It is possible to work around this in a couple of ways

  1. Using a workaround suggested by Jesus Menendez in the bug report

    queue.stream()
         .groupedBySizeAndTime(batchSize, batchTimeoutMillis, TimeUnit.MILLISECONDS)
         .futureStream(new LazyReact(ThreadPools.getSequential()))
         .async()
         .peek(this::executeBatch)
         .run();
    

This avoids the overhead that results in two values being batched together.

  1. We can timeout after 500ms (and not wait until a value arrives in the Queue for batching) by making use of the streamBatch operator

    Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    new Thread(()->{
        for(int i=0;i<10;i++){
    
            queue.offer("New message " + i);
            sleep(10000);
        }
        queue.close();
    }).start();
    
    long toRun = TimeUnit.MILLISECONDS.toNanos(500l);
    
    queue.streamBatch(new Subscription(), source->{
    
        return ()->{
            List<String> result = new ArrayList<>();
    
    
               long start = System.nanoTime();
    
                   while (result.size() < 10 && (System.nanoTime() - start) < toRun) {
                       try {
                           String next = source.apply(1l, TimeUnit.MILLISECONDS);
                           if (next != null) {
                               result.add(next);
                           }
                       }catch(Queue.QueueTimeoutException e){
    
                       }
    
    
                   }
    
            start=System.nanoTime();
    
            return result;
        };
    }).filter(l->l.size()>0)
      .futureStream(new LazyReact(ThreadPools.getSequential()))
            .async()
            .peek(System.out::println)
            .run();
    

In this case we will always group after 500ms and not wait until a value we have asked for arrives in the Queue.

John McClean
  • 5,225
  • 1
  • 22
  • 30