4

I am trying to implement a simple sliding window function in RxJava2, but struggling to do what I want.

My goal is to take a stream of objects, i.e.

["a", "b", "c", "d", "e"]

and apply a sliding window which will return the elements adjacent to each element.

I.e resulting in:

["a", "b"]
["a", "b", "c"]
["b", "c", "d"]
["c", "d", "e"]
["d", "e"].

I.E.

a----------------b----------------c----------------d----------------e
↓                ↓                ↓                ↓                ↓
↓                ↓                ↓                ↓                ↓
↓                ↓                ↓                ↓                ↓
↓                ↓                ↓                ↓                ↓
["a", "b"]       ["a", "b", "c"]  ["b", "c", "d"]  ["c", "d", "e"]  ["d", "e"]

I can't seem to figure out how to make this happen. A Google Groups post seems like it is on the right track, but doesn't quite get the result I need: https://groups.google.com/forum/#!topic/rxjava/k-U5BijXinU

Any ideas?

Cory Dolphin
  • 2,650
  • 1
  • 20
  • 30
  • I don't think the problem title is suitable for the problem statement. A sliding window is something like [this](http://stackoverflow.com/questions/8269916/what-is-sliding-window-algorithm-examples) while it seems you want the adjacent elements of each element. – jrook Apr 10 '17 at 03:26
  • There is a problem with your diagram. You cannot possibly emit an item as output before it has actially arrived as input. Eg. after receiving items "a" and "b" you can't say that the next adjacent would be "c". Otherwise `buffer()` method will do. – Yaroslav Stavnichiy Apr 10 '17 at 12:20
  • @jrook I think it is the same as a sliding window with a different boundary condition. – Cory Dolphin Apr 10 '17 at 18:21
  • @YaroslavStavnichiy it is okay to be phase-delayed for my purposes, i.e. buffering is OK. – Cory Dolphin Apr 10 '17 at 18:22

2 Answers2

6

Depending whether you want your observable to emit List<Item> or Observable<Item> you might use either buffer() or window() operators. The solution isn't that clean but it's pretty straightforward:

Observable.fromArray("a", "b", "c", "d", "e")
        .startWith("")
        .buffer(3, 1)
        .map(strings -> {
            strings.remove("");
            return strings;
        })
        .filter(strings -> strings.size() > 1)

returns

["a", "b"]
["a", "b", "c"]
["b", "c", "d"]
["c", "d", "e"]
["d", "e"]
Lamorak
  • 10,957
  • 9
  • 43
  • 57
  • This would've been perfect, but I couldn't figure out how to make a generic sentinel element (as "" is in this case), when using a richer object, i.e. a custom type. In my case, the strings are actually e.g. CustomerEvents. – Cory Dolphin Apr 10 '17 at 18:51
  • 1
    Create an instance of CustomerEvents with some values in CustomerEvents class and make it static. Then you can test against the "sentinal" objekt by accessing the static member. – Sergej Isbrecht Apr 11 '17 at 14:22
0

Perhaps as @jrook said, this isn't the best fit for a standard windowing. The array size itself is not enough information to know which side of the element you are on anyways, so I wrapped in in a simple value class.

Here's the solution I went with. It definitely is not a good solution for larger streams, since it blocks to read the whole observable first, which obviously may not be okay for certain use cases (in mine, it is OK).

 public static <R> ObservableTransformer<R, AdjacentPairing<R>> pairWithAdjacents() {
    return upstream -> upstream
        .toList()
        .flatMapObservable(list -> {
          ArrayList<AdjacentPairing<R>> pairings = new ArrayList<>(list.size());
          for (int i = 0; i < list.size(); i++) {
            pairings.add(AdjacentPairing.from(
                i == 0 ? null : list.get(i - 1),
                list.get(i),
                i == list.size() -1 ? null : list.get(i + 1)));
          }
          return Observable.fromIterable(pairings);
        });
  }

  @AutoValue
  public static abstract class AdjacentPairing<T> {
    @Nullable
    public abstract T getPrevious();
    public abstract T getElement();
    @Nullable
    public abstract T getNext();

    public static <T> AdjacentPairing<T> from(@Nullable T previous, T element, @Nullable T next){
      return new AutoValue_RxUtils_AdjacentPairing<>(previous, element, next);
    }
  }
Cory Dolphin
  • 2,650
  • 1
  • 20
  • 30