2

Am new in using kafka and spring cloud stream. Need some help.

SetUP

  • I have two spring-boot applications App-1, App-2.
  • I am using spring cloud stream and spring-cloud-stream-binder-kafka for async communication.
  • There is one topic TOPIC-1


Use Case
  • Suppose App-1 sent a message on topic TOPIC-1 which App-2 was listening.
  • App-2 consumed the message and processed it successfully.
  • Now offset of that topic gets incremented.

Question

  • How can i implement a mechanism to delete the only successfully consumed message's data from kafka logs after a specified period of time?

In Kafka, the responsibility of what has been consumed is the responsibility of the consumer. So I guess, there must be some kafka message log control mechanism in spring cloud stream kafka that i am not aware of.


NOTE 1 : I know about the kafka log retention time and disk properties. But kafka logs will be deleted even for non consumed messages.

NOTE 2: I have gone through this question but it can't help.

amarjeetAnand
  • 455
  • 5
  • 12

1 Answers1

2

There is no such mechanism, that I am aware of, in Kafka; and certainly not in Spring Cloud Stream or the libraries it is based on. Kafka clients don't have access to such low-level constructs.

Also, consumer offsets are completely separate to the topic logs; in modern brokers, they are stored in a special topic.

EDIT

Per the comment below, the kafka-delete-records.sh command line tool can be used.

Note that this uses the scala AdminClient which is not on the SCSt classpath by default (since 2.0).

However, the java AdminClient supports similar functionality:

/**
 * Delete records whose offset is smaller than the given offset of the corresponding partition.
 *
 * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
 * See the overload for more details.
 *
 * This operation is supported by brokers with version 0.11.0.0 or higher.
 *
 * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
 * @return                      The DeleteRecordsResult.
 */
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
    return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
}

You can create an AdminClient using boot's AutoConfiguration KafkaAdmin.

AdminClient client = AdminClient.create(kafkaAdmin.getConfig());
Gary Russell
  • 166,535
  • 14
  • 146
  • 179