Spring Cloud : Supplier continuously Kafka publishing events how to published only one ?
public static HashMap<String, Ticker> transactionsOfAccount = new HashMap<>(0);
public LinkedList<Ticker> lists = new LinkedList<>();
Producer.class
@Bean
public Supplier<Message<Ticker>> messageSupplier() {
return () -> {
if (tickerPublisher.lists.peek() != null) {
Message<Ticker> msg = MessageBuilder
.withPayload(tickerPublisher.lists.peek())
.build();
log.info("Total Size is {}",tickerPublisher.lists.size());
log.info("Message: {}", msg.getPayload());
tickerPublisher.lists.get(0).setStatus(Status.SUCESS);
return msg;
} else {
return null;
}
};
}
RestController.class
@GetMapping(value = "/quote-mono", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<Ticker> getQuoteMono(@RequestParam("symbol") String symbol) {
tickerPublisher.publisherMono(symbol);
return mono;
}
PublisherService.class
public void publisherMono(String ticker) {
String path = ticker.toUpperCase() + "/prices/realtime?api_key=" + apiKey;
this.webClient
.get()
.uri(path)
.retrieve()
.bodyToMono(Ticker.class)
.flatMap(data -> sendToKafka(ticker, data))
.doOnNext(data-> {
log.info("next events from published : {}", data);
if (transactionsOfAccount.containsKey(ticker) && !lists.isEmpty()) {
log.info("list is clear now ");
transactionsOfAccount.clear();
}
})
.subscribe(
data -> {
log.info("data is {}", data);
this.sinkMono.tryEmitValue((Ticker) data);
},
(err) -> log.info(String.valueOf(err)),
() -> {
log.info("Completed");
}
)
;
log.info(lists.toString());
}
Issues is Supplier Class is publishing continuously duplicate events in KafkaBroker
Need help where is the issues how Can we published Mono Single events if response is Single object
Rest response is
{"last_price":245.3,"last_time":"2022-12-09T22:44:53.000Z","last_size":null,"bid_price":244.98,"bid_size":100,"ask_price":250.0,"ask_size":96,"open_price":246.4,"close_price":null,"high_price":248.2,"low_price":244.37,"exchange_volume":1168680,"market_volume":null,"updated_on":"2022-12-09T22:59:58.183Z","source":"bats_delayed","security":{"id":"sec_XaL6mg","ticker":"MSFT","exchange_ticker":"MSFT:UW","figi":"BBG000BPHFS9","composite_figi":"BBG000BPH459"}}
Expectation is to published only one events with WebClient