1

Recently I've been trying to reimplement my data parser into streams in java, but I can't figure out how to do one specific thing:

Consider object A with timeStamp. Consider object B which is made of various A objects Consider some metrics which tells us time range for object B.

What I have now is some method with state which goes though list with objects A and if it fits into last object B, it goes there, otherwise it creates new B instance and starts putting objects A there.

I would like to do this in streams way

Take whole list of objects A and make it as stream. Now I need to figure out function which will create "chunks" and accumulate them into objects B. How do I do that?
Thanks

EDIT:

A and B are complex, but I will try to post here some simplified version.

class A {
    private final long time;
    private A(long time) {
        this.time = time;
    }
    long getTime() {
        return time;
    }
}

class B {
     // not important, build from "full" temporaryB class
     // result of accumulation         
}

class TemporaryB {
    private final long startingTime;
    private int counter;

    public TemporaryB(A a) {
        this.startingTime = a.getTime();
    }

    boolean fits(A a) {
        return a.getTime() - startingTime < THRESHOLD;
    }

    void add(A a) {
        counter++;
    }
}

class Accumulator {
    private List<B> accumulatedB;
    private TemporaryBParameters temporaryBParameters
    public void addA(A a) {
         if(temporaryBParameters.fits(a)) {
             temporaryBParameters.add(a)
         } else {
             accumulateB.add(new B(temporaryBParameters)
             temporaryBParameters = new TemporaryBParameters(a)
         }
    } 
}

ok so this is very simplified way how do I do this now. I don't like it. it's ugly.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
Snurka Bill
  • 973
  • 4
  • 12
  • 29

1 Answers1

0

In general such problem is badly suitable for Stream API as you may need non-local knowledge which makes parallel processing harder. Imagine that you have new A(1), new A(2), new A(3) and so on up to new A(1000) with Threshold set to 10. So you basically need to combine input into batches by 10 elements. Here we have the same problem as discussed in this answer: when we split the task into subtasks the suffix part may not know exactly how many elements are in the prefix part, so it cannot even start combining data into batches until the whole prefix is processed. Your problem is essentially serial.

On the other hand, there's a solution provided by new headTail method in my StreamEx library. This method parallelizes badly, but having it you can define almost any operation in just a few lines.

Here's how to solve your problem with headTail:

static StreamEx<TemporaryB> combine(StreamEx<A> input, TemporaryB tb) {
    return input.headTail((head, tail) ->
        tb == null ? combine(tail, new TemporaryB(head)) :
            tb.fits(head) ? combine(tail, tb.add(head)) :
                combine(tail, new TemporaryB(head)).prepend(tb), 
        () -> StreamEx.ofNullable(tb));
}

Here I modified your TemporaryB method this way:

TemporaryB add(A a) {
    counter++;
    return this;
}

Sample (assuming Threshold = 1000):

List<A> input = Arrays.asList(new A(1), new A(10), new A(1000), new A(1001), new A(
        1002), new A(1003), new A(2000), new A(2002), new A(2003), new A(2004));

Stream<B> streamOfB = combine(StreamEx.of(input), null).map(B::new);
streamOfB.forEach(System.out::println);

Output (I wrote simple B.toString()):

B [counter=2, startingTime=1]
B [counter=3, startingTime=1001]
B [counter=2, startingTime=2002]

So here you actually have a lazy Stream of B.


Explanation:

StreamEx.headTail parameters are two lambdas. First is called at most once when input stream is non-empty. It receives the first stream element (head) and the stream containing all other elements (tail). The second is called at most once when input stream is empty and receives no parameters. Both should produce an output stream which would be used instead. So what we have here:

return input.headTail((head, tail) ->

tb == null is the starting case, create new TemporaryB from the head and call self with the tail:

    tb == null ? combine(tail, new TemporaryB(head)) :

tb.fits(head) ? Ok, just add the head into existing tb and call self with the tail:

        tb.fits(head) ? combine(tail, tb.add(head)) :

Otherwise again create new TemporaryB(head), but also prepend the output stream with the current tb (actually emitting a new element into target stream):

            combine(tail, new TemporaryB(head)).prepend(tb), 

Input stream is exhausted? Ok, return the last gathered tb if any:

    () -> StreamEx.ofNullable(tb));

Note that headTail implementation guarantees that such solution while looking recursive does not eat the stack and heap more than constant amount. You can check it on thousands of input elements if you doubt:

Stream<B> streamOfB = combine(LongStreamEx.range(100000).mapToObj(A::new), null).map(B::new);
streamOfB.forEach(System.out::println);
Community
  • 1
  • 1
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334