Suppose I have two sources of historical events, and that the events of each source are in chronological order.
How can I merge these sources using Reactor so that the events in merged Flux are emitted in chronological order?
In RxNET, a combination of Observable.Generate()
and HistoricalScheduler
can be used to create an Observable
from a source of historical events such that the emissions are scheduled according to times of the events (as detailed here), but I cannot figure out an equivalent way in Reactor.
Maybe I could somehow use Flux.generate()
with VirtualTimeScheduler
?
Below is a toy example:
public class Program {
public record Event(Instant time, String id) {}
public static void main(String[] args) {
var source1 = Arrays.asList(
new Event(Instant.ofEpochMilli(10), "a"),
new Event(Instant.ofEpochMilli(30), "c"),
new Event(Instant.ofEpochMilli(50), "e")
);
var source2 = Arrays.asList(
new Event(Instant.ofEpochMilli(20), "b"),
new Event(Instant.ofEpochMilli(40), "d"),
new Event(Instant.ofEpochMilli(60), "f")
);
Flux.fromIterable(source1)
.mergeWith(Flux.fromIterable(source2))
.subscribe(e -> System.out.println(e.id));
// current output:
// a
// c
// e
// b
// d
// f
// desired output:
// a
// b
// c
// d
// e
// f
}
}