0

I have the following kafka stream configuration.

StreamBuilder builder = stream("TopicA", Serdes.String(), new 
        SpecificAvroSerde<TestObject>())
    .filter((key, value) -> value!=null)
    .selectKey((key, value) -> value.getSomeProperty())
    .groupByKey(Grouped.with(Serdes.Long(), new 
        SpecificAvroSerde<TestObject>()))
    .reduce((oldValue, newValue) -> newValue), 
        Materialized.as("someStore"));

This works as I expect but I can't figure put how I can deal with Tombstone message for TestObject, even I remove

.filter((key, value) -> value!=null)

I can't figure out how can I deal with 'selectKey' while when the value arrives as null I can't send a tombstone message with 'value.getSomeProperty()' while value will be also null..

How would you deal with this problem?

posthumecaver
  • 1,584
  • 4
  • 16
  • 29

1 Answers1

1

You can use transform() instead of selectKey() and store the old <key,value> pair in a state store. This way, when <key,null> is processed, you can get the previous value from the store, and get the previously extracted new key and send a corresponding tombstone.

However, reduce() cannot process any record with null key or null value (those would be dropped). Thus, you will need to use a surrogate value instead of null to get the record into the Reduce function. If the surrogate is received, Reduce can return null.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thx for the answer, I actually don't want to high jack this response for another question, I like to ask you opinion about another question. https://stackoverflow.com/questions/54979742/what-should-be-the-replication-factor-of-changelog-repartition-topics , is it bad to leave replication factor 1 for changelog/repartition topics for production environment? – posthumecaver Mar 06 '19 at 06:50
  • There is already a good answer on this question. I agree with Bill, that you should set replication factor to 3 if a production environment. Note, that for existing topics, you will need to manually reconfigure those topics (cf https://issues.apache.org/jira/browse/KAFKA-7803) – Matthias J. Sax Mar 07 '19 at 02:15