3

Using cyclops-react 1.0.0-RC3, I tried to recreate the examples on the cyclops-react streams user guide with batching. I found that some methods were missing from ReactiveSeq, including batchBySize and windowByTime.

I did find these methods on StreamUtils and they worked as expected, but didn't look quite as slick as the examples in the user guide...

from user guide...

// Example 19. Batch by size example
ReactiveSeq.of(1,2,3,4,5, 6)
  .map(n-> n==6? sleep(1) : n)
  .batchBySize(4) // this function seems to be missing...
  .toList()

what I could get working...

import com.aol.cyclops.control.ReactiveSeq;
// ...
StreamUtils.batchBySize(
    ReactiveSeq.of(1, 2, 3, 4, 5, 6)
        .map(n -> TestUtils.mayBeSlow(n)),
    4)
    .collect(Collectors.toList());

You can see my code in a working JUnit in the testBatchingSlidingWindowing method test class StreamsTest.java

Should I expect to find the batchBySize and windowByTime on ReactiveSeq or is using StreamUtils the appropriate way?

John McClean
  • 5,225
  • 1
  • 22
  • 30
James Burton
  • 132
  • 9

1 Answers1

3

Use grouped instead. It is available on all cyclops-react Traversable types (such as ListX, SetX, QueueX, DequeX, ReactiveSeq and others). So your example would become

ReactiveSeq.of(1,2,3,4,5, 6)
           .map(n-> n==6? sleep(1) : n)
           .grouped(4) 
           .toList()

The groupedXXX operators act like both batchByXXX and windowByXXX providing access to the grouped data via an extended Collection type, which itself has all of the traversable & foldable operators.

E.g. to double e.g. member of the group / batched list

 ReactiveSeq.of(1,2,3,4,5, 6)
           .map(n-> n==6? sleep(1) : n)
           .grouped(4) 
           .map(list-> list.map(i->i*2))
           .toList() 

You can also use groupedT which returns a ListTransformer. ListTransformers allow you to manipulate nested structures as if they were unnested.

E.g. to double e.g. member of the group / batched list using groupedT

ReactiveSeq.of(1,2,3,4,5, 6)
           .map(n-> n==6? sleep(1) : n)
           .groupedT(4) 
           .map(i->i*2);

And to convert a ListTransformer back to a Stream of Lists

ListTSeq<Integer> listT = ReactiveSeq.of(1,2,3,4,5, 6)
                                     .map(n-> n==6? sleep(1) : n)
                                     .groupedT(4);

ReactiveSeq<ListX<Integer>> nested = listT.toNestedListX()
                                          .stream(); 
John McClean
  • 5,225
  • 1
  • 22
  • 30
  • 1
    Perfect! Thanks, John. I should have thought to look in your tests. [BatchingTest.java](https://github.com/aol/cyclops-react/blob/master/src/test/java/com/aol/cyclops/streams/BatchingTest.java) and [WindowingTest.java](https://github.com/aol/cyclops-react/blob/master/src/test/java/com/aol/cyclops/streams/WindowingTest.java) show these 'group' methods in action. – James Burton May 24 '16 at 08:31