1

I'm writing a streaming Twitter client that simply throws the stream up onto a tv. I'm observing the stream with RxJava.

When the stream comes in a burst, I want to buffer it and slow it down so that each tweet is displayed for at least 6 seconds. Then during the quiet times, any buffer that's been built up will gradually empty itself out by pulling the head of the queue, one tweet every 6 seconds. If a new tweet comes in and faces an empty queue (but >6s after the last was displayed), I want it to be displayed immediately.

I imagine the stream looking like that described here:

Raw:      --oooo--------------ooooo-----oo----------------ooo|
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|

And I understand that the question posed there has a solution. But I just can't wrap my head around its answer. Here is my solution:

myObservable
    .concatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long l) {
            return Observable.concat(
                Observable.just(l),
                Observable.<Long>empty().delay(6, TimeUnit.SECONDS)
            );
        }
    })
    .subscribe(...);

So, my question is: Is this too naïve of an approach? Where is the buffering/backpressure happening? Is there a better solution?

Community
  • 1
  • 1
dgmltn
  • 3,833
  • 3
  • 25
  • 27

1 Answers1

2

Looks like you want to delay a message if it came too soon relative to the previous message. You have to track the last target emission time and schedule a new emission after it:

public class SpanOutV2 {
    public static void main(String[] args) {
        Observable<Integer> source = Observable.just(0, 5, 13)
                .concatMapEager(v -> Observable.just(v).delay(v, TimeUnit.SECONDS));

        long minSpan = 6;
        TimeUnit unit = TimeUnit.SECONDS;
        Scheduler scheduler = Schedulers.computation();

        long minSpanMillis = TimeUnit.MILLISECONDS.convert(minSpan, unit);

        Observable.defer(() -> {
            AtomicLong lastEmission = new AtomicLong();

            return source
            .concatMapEager(v -> {
                long now = scheduler.now();
                long emission = lastEmission.get();

                if (emission + minSpanMillis > now) {
                    lastEmission.set(emission + minSpanMillis);
                    return Observable.just(v).delay(emission + minSpanMillis - now, TimeUnit.MILLISECONDS);
                }
                lastEmission.set(now);
                return Observable.just(v);
            });
        })
        .timeInterval()
        .toBlocking()
        .subscribe(System.out::println);
    }
}

Here, the source is delayed by the number of seconds relative to the start of the problem. 0 should arrive immediately, 5 should arrive @ T = 6 seconds and 13 should arrive @ T = 13. concatMapEager makes sure the order and timing is kept. Since only standard operators are in use, backpressure and unsubscription composes naturally.

akarnokd
  • 69,132
  • 14
  • 157
  • 192
  • Yes! It definitely looks like this will solve my problem. Out of curiosity, is there an advantage to delaying before the message (like you've done) over delaying after the message (like I've done)? I think both solutions will work. Maybe there's a subtle Rx difference I don't understand. – dgmltn Apr 10 '16 at 20:59
  • Less operators involved, less overhead. – akarnokd Apr 11 '16 at 07:19