0

Suppose I have two sources of historical events, and that the events of each source are in chronological order.

How can I merge these sources using Reactor so that the events in merged Flux are emitted in chronological order?

In RxNET, a combination of Observable.Generate() and HistoricalScheduler can be used to create an Observable from a source of historical events such that the emissions are scheduled according to times of the events (as detailed here), but I cannot figure out an equivalent way in Reactor.

Maybe I could somehow use Flux.generate() with VirtualTimeScheduler?

Below is a toy example:

public class Program {

  public record Event(Instant time, String id) {}

  public static void main(String[] args) {

    var source1 = Arrays.asList(
        new Event(Instant.ofEpochMilli(10), "a"),
        new Event(Instant.ofEpochMilli(30), "c"),
        new Event(Instant.ofEpochMilli(50), "e")
    );

    var source2 = Arrays.asList(
        new Event(Instant.ofEpochMilli(20), "b"),
        new Event(Instant.ofEpochMilli(40), "d"),
        new Event(Instant.ofEpochMilli(60), "f")
    );

    Flux.fromIterable(source1)
        .mergeWith(Flux.fromIterable(source2))
        .subscribe(e -> System.out.println(e.id));

    // current output:
    // a
    // c
    // e
    // b
    // d
    // f

    // desired output:
    // a
    // b
    // c
    // d
    // e
    // f
   
  }
}
Mark Rotteveel
  • 100,966
  • 191
  • 140
  • 197
jack
  • 147
  • 1
  • 7

1 Answers1

1

You can use mergeComparingWith operator and provide a Comparator like this:

Flux.fromIterable(source1)
         .mergeComparingWith(Flux.fromIterable(source2), Comparator.comparing(Event::time, Instant::compareTo))
         .subscribe(e -> System.out.println(e.id));

It produces a reordered merge sequence, by picking the smallest value from each sequence.

lkatiforis
  • 5,703
  • 2
  • 16
  • 35
  • Thanks this is helpful. Is it possible to now apply time-based operators (e.g., `window`) to the merged sequence so that the operators respect event time rather than the actual time? – jack Jan 03 '22 at 11:59
  • @jack not really. at that point, the fact that the objects have a timestamp or something isn't reflected in the timing of the emissions: fromIterable will just replay them as fast as possible. it might be possible with some clever use of operators like concatMap and Mono.delay, but you'd have to manage turning timestamps into proper delays... – Simon Baslé Jan 05 '22 at 10:29
  • that said, `HistoricalScheduler` in Rx seems pretty similar to `VirtualTimeScheduler` in Reactor. I wonder if some techniques can be implemented to use that. `window(Duration, Scheduler)` could be used but the tricky part is in how to advance the scheduler – Simon Baslé Jan 05 '22 at 10:48