I am trying to use cyclops-react to batch the elements from a queue, based on size, but also on time, so it doesn't block when there are no elements
Maybe the functionality is not what I expected or I am doing something wrong
The complete code (Groovy) is like this with the producer in another thread:
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
new Thread({
while (true) {
sleep(1000)
queue.offer("New message " + System.currentTimeMillis());
}
}).start();
StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
.groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
.forEach({i->println(i + " Batch Time: ${System.currentTimeMillis()}")})
The output is:
[New message 1487673650332, Batch Time: 1487673651356]
[New message 1487673651348, New message 1487673652352, Batch Time: 1487673653356]
[New message 1487673653355, New message 1487673654357, Batch Time: 1487673655362]
[New message 1487673655362, New message 1487673656364, Batch Time: 1487673657365]
But I was expecting one element in each batch since the delay between elements offered is 10seconds but the batching is every half a second
Also I tried with an asynchronous stream (Groovy code):
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
.async()
.groupedBySizeAndTime(10, 500,TimeUnit.MILLISECONDS)
.peek({i->println(i + "Batch Time: ${System.currentTimeMillis()}")}).run();
while (true) {
queue.offer("New message " + System.currentTimeMillis());
sleep(1000)
}
Again, it only batches every 2 seconds, sometimes waiting for two elements per batch, even if the timeout in the batch is half second:
[New message 1487673877780, Batch Time: 1487673878819]
[New message 1487673878811, New message 1487673879812, Batch Time: 1487673880815]
[New message 1487673880814, New message 1487673881819, Batch Time: 1487673882823]
[New message 1487673882823, New message 1487673883824, Batch Time: 1487673884828]
[New message 1487673884828, New message 1487673885831, Batch Time: 1487673886835]
I did a third experiment with a non future non lazy stream, and this time it worked.
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
new Thread({
while (true) {
sleep(1000)
queue.offer("New message " + System.currentTimeMillis());
}
}).start();
queue.stream()
.groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
.forEach({i->println(i + " Batch Time " + System.currentTimeMillis())})
Result:
[New message 1487673288017, New message 1487673289027, Batch Time , 1487673289055]
[New message 1487673290029, Batch Time , 1487673290029]
[New message 1487673291033, Batch Time , 1487673291033]
[New message 1487673292037, Batch Time , 1487673292037]
Why the behaviour of the batching seems to be wrong when you use a future stream?