2

Context: To process a Flowable<Item>, I need to first process the first item and then depending on that either accumulate all items into a single item (reduce) OR apply a simple map on each item without any accumulation (map).

One way I can think of requires operator to be aware that current element is last element. Is there any such operator which is aware whether current element is last element ? I can't use buffer because then it'll always fetch 2 elements even when accumulation shouldn't be done.

AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
    flatMap(item -> {
        if(item.shouldBeAccumulated()) {
            //Accumulate data into reference
            itemRef.set(itemRef.get().addData(item.getData()));
            //Return empty to throw away consumed item;
            return Flowable.empty();
        } else {
            item.updateProperty();
            return Flowable.just(item);
        }
    })
    .applyIfLastElement(item -> {
        if (item.shouldBeAccumulated()) {
            return Flowable.just(itemRef.get());
        }
    })
Filip
  • 19,269
  • 7
  • 51
  • 60
gGwP
  • 35
  • 1
  • 6
  • If all you want to do is to know when you've got the last item then you can use .buffer(2, 1) (buffer in chunks of size 2 with step 1). When you have the last item the length of the list will be 1. – Dave Moten May 12 '22 at 08:39
  • buffer will fetch current and next item **every time**, that's why I don't want to use that. This is noted in the question itself (maybe my bad formatting made it hard to notice). – gGwP May 13 '22 at 05:53
  • 1
    I wanted to make sure you knew about the step 1 trick. Either way the next item gets used eventually. You haven't made it clear what aspect of using buffer is a problem. I knocked up a solution to the problem mentioned in context. If that's of use to you then you might want to change the title. – Dave Moten May 13 '22 at 21:33
  • Here, I'm getting "mapper returned a null Publisher", possibly because of the Flowable.empty(). Is there any alternative ? @DaveMoten – gGwP Jun 07 '22 at 12:25

1 Answers1

1

Below is how you can do it (in RxJava 2.x which is very close to RxJava 3.x). The trick is to use defer (the best way to encapsulate state for a Flowable so that it can be subscribed to many times) and concatWith. defer also enables lazy evaluation in the case of last. Notice also as a performance improvement that you may not care about I used one element arrays instead of AtomicReference objects (to avoid unnecessary volatile reads, sets etc).

Flowable<Integer> result = Flowable.defer(() -> {
    boolean[] isFirst = new boolean[] { true };
    Integer[] state = new Integer[1];
    Maybe<Integer> last = Maybe.defer(() -> {
        if (state[0] == null) {
            return Maybe.empty();
        } else {
            return Maybe.just(state[0]);
        }
    });
    return source //
            .flatMap(x -> {
                if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
                        // accumulate
                        state[0] = state[0] == null ? 0 : state[0] + x;
                        isFirst[0] = false;
                        return Flowable.empty();
                    } else {
                        isFirst[0] = false;
                        return Flowable.just(x);
                    }
                })
            .concatWith(last);
    });
Dave Moten
  • 11,957
  • 2
  • 40
  • 47