3

How can I restart or stop/resume the reactive messaging, e.g. after changing the interval time? This example is from the Quarkus guide: https://quarkus.io/guides/kafka-streams

@Outgoing("temperature-values")                             
public Flowable<KafkaRecord<Integer, String>> generate() {

    return Flowable.interval(500, TimeUnit.MILLISECONDS)    
            .onBackpressureDrop()
            .map(tick -> {
                WeatherStation station = stations.get(random.nextInt(stations.size()));
                double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
            });
}
Kevin
  • 1,232
  • 10
  • 28

1 Answers1

1

You can try to replace Flowable with Subject, as an option, and use Flowable to feed values into Subject itself. Then, when you want to replace whatever you need, you'll drop current Flowable and create new, that will feed Subject

class YourClass {

    private Subject<KafkaRecord<Integer, String>> temperatureSubject = BehaviorSubject.create();
    private Disposable currentSubscription;

    public void setFlowable() {
        if(currentSubscription != null && !currentSubscription.isDisposed()) {
            currentSubscription.dispose();
        }
        currentSubscription = Flowable.interval(5, TimeUnit.SECONDS)
                .map(it -> {
                    WeatherStation station = stations.get(random.nextInt(stations.size()));
                    double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                    LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                    return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
                }).subscribe(it -> {
                    temperatureSubject.onNext(it);
                });
    }

    @Outgoing("temperature-values")
    public Flowable<KafkaRecord<Integer, String>> generate() {
        return temperatureSubject.toFlowable(BackpressureStrategy.LATEST);
    }
}
Dmytro Chaban
  • 1,106
  • 1
  • 11
  • 19