I have the above diagram while working with the Rest API CRUD operations using Micronaut framework. I have the uni-directional flow where the controller needs to know the operation perform from the Kafka consumer API.
For example
- To get all the list of item from the database it is performed at the consumer level
- Add/Update/Delete operation at the consumer level
I have the below Micronaut reactive code at the consumer level(listener) to get the list of product from the mongo DB
@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
public Flowable<Product> findByFreeText(String text) {
LOG.info(String.format("Listener --> Listening value = %s", text));
return Flowable.fromPublisher(repository.getCollection("product", Product.class)
.find(new Document("$text",
new Document("$search", text)
.append("$caseSensitive", false)
.append("$diacriticSensitive", false)
)));
}
Producer interface
@KafkaClient
public interface IProductProducer {
@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
Flowable<Product> findFreeText(String text);
}
Implementation of producer
@Override
public Flowable<ProductViewModel> findFreeText(String text) {
LOG.info("Manager --> Finding all the products");
List<ProductViewModel> model = new ArrayList<>();
iProductProducer.findFreeText(text).subscribe(item -> {
System.out.println(item);
}, error ->{
System.out.println(error);
});
return Flowable.just(new ProductViewModel());
}
From the Micronaut documentation https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerMethods
Receiving and returning Reactive Types as per Micronaut documentation
@Topic("reactive-products")
public Single<Product> receive(
@KafkaKey String brand,
Single<Product> productFlowable) {
return productFlowable.doOnSuccess((product) ->
System.out.println("Got Product - " + product.getName() + " by " + brand)
);
}
In my producer implementation, I never received a value when I subscribe.
iProductProducer.findFreeText(text).subscribe(item -> {
System.out.println(item);
}, error ->{
System.out.println(error);
});
I am wondering if the above flow is the correct way of performing the REST API CRUD operation with kafka ?