2

How can I filter Confluent Cloud cluster audit log based on the environment while consuming from the topic: confluent-audit-log-events in spring boot application?

I'm using the camel route to consume from the audit topic.

public void configure() throws Exception {

        from("kafka:confluent-audit-log-events")
                .process(exchange -> {
                    log.info(this.KafkaDetails(exchange));
                });
    }

    private String KafkaDetails(Exchange exchange) {
        return exchange.getIn().getBody(String.class);
    }

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • It would be useful if you showed an example record from the topic that you'd like to filter on, but have you even tried `kafkaStream.stream("confluent-audit-log-events").filter()`? – OneCricketeer Aug 11 '21 at 15:52
  • @OneCricketeer, I'm using the Camel route as I posted in the above description! – Vishal Dhanani Aug 11 '21 at 15:57
  • Have you read this? https://camel.apache.org/manual/latest/routes.html#Routes-Filters – OneCricketeer Aug 11 '21 at 15:59
  • @OneCricketeer Yeah, I read that one but I have no idea how can I filter from audit data using a header! – Vishal Dhanani Aug 11 '21 at 16:13
  • You dont need a header. You just use a `Predicate` like `exchangeBody -> exchangeBody.contains("my-environment")` – OneCricketeer Aug 11 '21 at 16:18
  • Although, it is better if you parse the data into a Java class, then use Bean expressions on the filter - https://camel.apache.org/components/latest/languages/bean-language.html – OneCricketeer Aug 11 '21 at 16:23

1 Answers1

1

Finally, I have figured it out using the below filter before the process.

.filter(body().convertToString().contains(kafkaClusterId))