1

I have a topic named SOURCE which contains a stream of two kinds of messages, A and B. I have written a kafka-streams application that consumes that topic, finds an A and a B with the same correlation ID and aggregates them into a new message C and puts that on the output topic DESTINATION

Sometimes an A without a B (or wise versa) will put on the SOURCE topic. I have created a queryable state store so I can look at these dangling messages but now I'd like to delete a specific message from the intermediate topic. I guessing it's just a matter of getting a message with the right key (which I have) and null ass the body into the intermediate topic. The question is what is the best way?

  1. Produce a special clear-message to SOURCE which would cause the aggregated message to become null
  2. Write a message directly to the intermediate topic with null data
  3. some other way, maybe kafka-streams already have an API call for this?

Bonus question: If I know that I don't want messages to sit in the intermediate topic for longer than 6months, can I instruct kafka-streams to create the intermediate topic with 6m retention or should I create the topic myself manually before I run the app?

Richo
  • 751
  • 7
  • 17
  • This isn't really what Kafka is intended for (hence why you're having trouble figuring out how to do it). You shouldn't be deleting messages from a Kafka topic. Think of it more as a queue than a data store. Ideally you'd only read each message once - when you consume it. If there's no B for an A or vice versa, it won't be written to DESTINATION, so is there really a problem? – Ben Watson Feb 26 '18 at 09:29
  • Short answer - you cannot (duplicate of https://stackoverflow.com/questions/35775489/delete-a-specific-kafka-message). Some links of interest: http://www.benstopford.com/2017/10/06/confluent-schema-registry-failover-failback/ https://stackoverflow.com/questions/28586008/delete-message-after-consuming-it-in-kafka – Artur Biesiadowski Feb 26 '18 at 10:09
  • Ben, if it's not written to DESTINATION it will stay in the intermediate topic forever, so the size of the topic will grow until infinity. Artur - you can it's 100% possible to delete messages in a topic with log compaction, that's what kafka-streams does under the hood, I'm talking about what would be the best practise in doing it myself – Richo Feb 26 '18 at 13:24
  • I think it would be better to write a message to the SOURCE topic -- this allows you to clean up internal state while processing the message. If you write to internal topic that Kafka Streams create directly, you might mess up the state of the applications. – Matthias J. Sax Feb 26 '18 at 16:45

1 Answers1

0

Bonus question: If I know that I don't want messages to sit in the intermediate topic for longer than 6months, can I instruct kafka-streams to create the intermediate topic with 6m retention or should I create the topic myself manually before I run the app?

Yes you can set retention time for example:

kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config retention.ms=16070400000

Or while creating the topic:

kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --if-not-exists --config retention.ms=16070400000 --topic my_topic
  • Thanks but that's not an option for me, I either want to create it myself with the correct retention or instruct kafka stream to set it up correctly, I don't want to have to go in and edit the topics after they've been created – Richo Feb 26 '18 at 12:09
  • I had extended the answer :-) – Seweryn Habdank-Wojewódzki Feb 26 '18 at 13:38
  • And is it a good practise to create the topics manually or should you let kafka-streams create them? they are kind of internal workings of kafka-streams after all – Richo Feb 26 '18 at 14:05
  • 1
    We are creating topics manually, especially, that we are using other technical features (like SSL) and organizationally we would like to have a control over who and how is using Kafka broker. For example we have designed “our” naming convention, which fits to corporate standards. Of course, I understand scenarios, when topics are created automatically - in the beginning we used this option. – Seweryn Habdank-Wojewódzki Feb 26 '18 at 15:22