1

enter image description here

I have the above diagram while working with the Rest API CRUD operations using Micronaut framework. I have the uni-directional flow where the controller needs to know the operation perform from the Kafka consumer API.

For example

  1. To get all the list of item from the database it is performed at the consumer level
  2. Add/Update/Delete operation at the consumer level

I have the below Micronaut reactive code at the consumer level(listener) to get the list of product from the mongo DB

@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
public Flowable<Product> findByFreeText(String text) {
    LOG.info(String.format("Listener --> Listening value = %s", text));
    return Flowable.fromPublisher(repository.getCollection("product", Product.class)
            .find(new Document("$text",
                    new Document("$search", text)
                            .append("$caseSensitive", false)
                            .append("$diacriticSensitive", false)
            )));
}

Producer interface

@KafkaClient
public interface IProductProducer {
    @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
        Flowable<Product> findFreeText(String text);
}

Implementation of producer

@Override
    public Flowable<ProductViewModel> findFreeText(String text) {
        LOG.info("Manager --> Finding all the products");
        List<ProductViewModel> model = new ArrayList<>();
        iProductProducer.findFreeText(text).subscribe(item -> {
            System.out.println(item);
        }, error ->{
            System.out.println(error);
        });
        return Flowable.just(new ProductViewModel());
    }

From the Micronaut documentation https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerMethods

Receiving and returning Reactive Types as per Micronaut documentation

@Topic("reactive-products")
public Single<Product> receive(
        @KafkaKey String brand,  
        Single<Product> productFlowable) { 
    return productFlowable.doOnSuccess((product) ->
            System.out.println("Got Product - " + product.getName() + " by " + brand) 
    );
}

In my producer implementation, I never received a value when I subscribe.

iProductProducer.findFreeText(text).subscribe(item -> {
                System.out.println(item);
            }, error ->{
                System.out.println(error);
            });

I am wondering if the above flow is the correct way of performing the REST API CRUD operation with kafka ?

San Jaisy
  • 15,327
  • 34
  • 171
  • 290
  • 1
    I don't think I see the need for the consumer pulling from the database, then forwarding to the http client when you could just return the database result directly (its only a single document, right?). For the CUD events, having all those in a producer makes sense, but only as a type of audit log / database buffer – OneCricketeer Nov 05 '20 at 13:30
  • @OneCricketeer I have a microservice architecture, so if other microservice want to get all the products list, I need to call through HTTP client which is not a better solution in my case because I am already using KAFKA, so why I'll use the HTTP client – San Jaisy Nov 05 '20 at 13:55
  • Kafka is not a replacement for a database client, and you cant just get a single record from Kafka like you seem to be wanting to do. Besides, you're already calling `repository.getCollection` over HTTP – OneCricketeer Nov 05 '20 at 15:21
  • @OneCricketeer even for CUD operation UI need to know whether the data is CUD or not, don't we need to send the response back to the producer from the kafka consumer – San Jaisy Nov 05 '20 at 15:24
  • Ideally, producers and consumers have no knowledge of each other. You could always add some `"action"` field to your data, though – OneCricketeer Nov 05 '20 at 15:27
  • How this is performing in Micronaut https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerMethods '"Receiving and returning Reactive Types" section – San Jaisy Nov 05 '20 at 15:30
  • Yes, I am aware how that works, but you can create a higher level object than "Product" that includes appliaction-specific metadata such as what operation is being performed like `AppEvent { action ; body }` – OneCricketeer Nov 05 '20 at 15:56
  • @OneCricketeer - can you please show me an example, it will be a great help – San Jaisy Nov 05 '20 at 16:24
  • I dont know what your product class looks like. But you simply make a new class that wraps it, then produce/consume it instead. And I also dont know if this really solves the problem. I currently work with several REST+Kafka applications that interact with a database, and we still use database clients where it makes sense, and KTables in other places where there's a caching layer from a consumer rather than a consumer instance itself – OneCricketeer Nov 05 '20 at 17:10
  • To get the data from the database table, it is working fine, I am able to get the data from the database, the issue is with when I am returning the value back to the producer. the producer never receive a value – San Jaisy Nov 05 '20 at 17:36
  • Are you saying that `repository.getCollection("product", Product.class)` isn't working? Or that the Flowable isn't working? – OneCricketeer Nov 05 '20 at 17:41
  • Flowable.fromPublisher(repository.getCollection("product", Product.class) .find(new Document("$text", new Document("$search", text) .append("$caseSensitive", false) .append("$diacriticSensitive", false) ))); this is working fine, if I subscribe to this I am able to get the value. The issue is with the implementation of the producer iProductProducer.findFreeText(text).subscribe() this method never receive the value – San Jaisy Nov 05 '20 at 17:42
  • So, you are missing an `@Topic` annotation there after the `@Override`, and also, your parmater maybe should match the docs with `Single productFlowable)`? – OneCricketeer Nov 05 '20 at 17:46
  • I have a topic in the Producer interface, @overide is the implementation of that interface and the code with parameters Single productFlowable) is from the micronaut example, I don't have that code – San Jaisy Nov 05 '20 at 17:55
  • Sorry, I'm. very confused. In Kafka terms, Producers only push out to a topic. You cannot consume from instances of Producers directly. I don't know what `iProductProducer` is, but you seem to be wanting to move data around internally using ReactiveX rather than Kafka topics. – OneCricketeer Nov 05 '20 at 18:03
  • But the idea here is that `Flowable findByFreeText(String text) -> @Topic` goes into a remote Kafka server. You probably should not be subscribing to the returned Flowable of this yourself. You would instead have some `@KafkaListener` + `@Topic` annotated function that consumes the topic data (which would not necessarily be the most recent record from your find method) – OneCricketeer Nov 05 '20 at 18:10
  • 1
    Please have a look on my code https://github.com/anandjaisy/MicronautKafka/blob/master/src/main/java/fete/bird/service/consumer/ProductListener.java and https://github.com/anandjaisy/MicronautKafka/blob/master/src/main/java/fete/bird/service/producer/manager/ProductManager.java – San Jaisy Nov 06 '20 at 02:47
  • @OneCricketeer I think I am using Kafka in a wrong way, it should not be used for the applications that need a variety of publish/subscribe, point-to-point request/reply messaging capabilities. Kafka is meant for asynchronous programming, not for the synchronous programming – San Jaisy Nov 09 '20 at 02:47

0 Answers0