2

My goal is to consume from topic A, do some processing and produce to topic B, as a single atomic action. To achieve this I see two options:

  1. Use a spring-kafka @Kafkalistener and a KafkaTemplate as described here.
  2. Use Streams eos (exactly-once) functionality.

I have successfully verified option #1. By successfully, I mean that if my processing fails (IllegalArgumentException is thrown) the consumed message from topic A keeps being consumed by the KafkaListener. This is what I expect, as the offset is not committed and DefaultAfterRollbackProcessor is used.

I am expecting to see the same behaviour if instead of a KafkaListener I use a stream for consuming from topic A, processing and sending to topic B (option #2). But even though while I process an IllegalArgumentException is thrown the message is only consumed once by the stream. Is this the expected behaviour?

In the Streams case the only configuration I have is the following:

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig  kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");        
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        // this should be enough to enable transactions
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        return new StreamsConfig(props);
    }
}

//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here: https://docs.spring.io/spring-kafka/reference/html/_reference.html#after-rollback
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
    streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
            streamsBuilderFactoryBean.stop();
            log.debug("creating and starting a new StreamsThread ..");
            streamsBuilderFactoryBean.start();
        }
    });
    return streamsBuilderFactoryBean;
}

My Stream is like this:

@Autowired
public SpecificAvroSerde<InvoiceEvents> eventSerde;

@Autowired
private TaxService taxService;

@Bean
public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {

    KStream<String, InvoiceEvents> kStream = builder.stream("A",
            Consumed.with(Serdes.String(), eventSerde));

      kStream
        .mapValues(v -> 
            {
                // get tax from possibly remote service
                // an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
                int tax = taxService.getTaxForInvoice(v);
                // create a TaxCalculated event
                InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
                log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
                return taxCalculatedEvent;
            })
        .to("B", Produced.with(Serdes.String(), eventSerde));

    return kStream;
}

The happy path streams scenario works: if no exception is thrown while processing, message appears properly in topic B.

My unit test:

@Test
public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
    log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
    Mockito.when(taxService.getTaxForInvoice(any(InvoiceEvents.class)))
                        .thenThrow(new IllegalArgumentException("Tax calculation failed"));

    InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
    List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
            new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));

     Properties producerConfig = new Properties();
     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
     producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
     producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
     producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
     producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");

    // produce with key
    IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);

    // wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
    Thread.sleep(30000);
// ...
}

Update: In my unit test I sent 50 invoiceEvents (orderId=1,...,50), I process them and sent them to a destination topic.

In my logs the behaviour I see is as follows:

invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown
..new stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
...
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task.
..new stream starts..
invoiceEvent.**orderId = 46** → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed

Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?

Vassilis
  • 914
  • 8
  • 23
  • The question is, what you see in your output topic? Enabling exactly-once, guarantees that the output topic contains a result as if no-error occurred. If data is re-consumed depends if the output was written successfully before the failure or not. It's a little unclear when you throw the exception? – Matthias J. Sax Jun 19 '18 at 18:42
  • Thanks for the quick reply Matthias! In my unit test I always throw an IllegalArgumentException exception from taxService.getTaxForInvoice(v). And my expectation is that the message should be re-consumed, but I see only the initial consumption (i.e. no re-consumption). I am trying to understand if my expectation is correct. – Vassilis Jun 19 '18 at 19:20
  • I edited my post to include the unit test. – Vassilis Jun 19 '18 at 19:28
  • 1
    If you throw an exception, the `StreamThread` will die and it will not be automatically restarted. After you observe the first exception (via uncaught exception handler), do you close() Kafka Streams, create a new one, and restart it? – Matthias J. Sax Jun 19 '18 at 21:49
  • Indeed when an exception is thrown the StreamThread dies. I have configured a custom UncaughtExceptionHandler for my stream to create and start a new Stream and now I see re-consumption of non committed message (updated post). What I observe now is the following: I send to topic A 10 messages and if the last one fails (taxService throws exception), all of them are re-consumed. It is like messages are consumed in batches and their offsets are committed all together. Is this true? Can I configure my stream to commit each message separately?Thanks once more! – Vassilis Jun 20 '18 at 17:30
  • Yes, under the hood, messages are received and committed "in batches" -- this does not impact processing -- it's still recordy-by-record semantics. It's important for good throughput to not commit every single message. Also compare https://stackoverflow.com/questions/43416178/how-to-commit-manually-with-kafka-stream – Matthias J. Sax Jun 20 '18 at 17:57
  • What puzzles me is the fact that if a single record read as part of a batch/ poll fails, then the offsets of all messages (part of the batch/ poll) are not committed (and of course no record is sent to topic B) and they are re-consumed in next poll. This means that it is quite easy to end up in a "stuck" stream state, where you cannot progress because a single message cannot be processed correctly. Do you agree that this is a valid scenario? Do you maybe have any online resource to direct me for reading regarding proper strategies to tackle error handling scenarios like this one? – Vassilis Jun 20 '18 at 19:46
  • That sounds correct -- however, even if you commit every single message, you will get stuck as well... You would need to "skip over" the bad message somehow. Best would be, to filter it out in your application logic. There are other means to skip using exception handlers you can provide to Kafka Streams. – Matthias J. Sax Jun 20 '18 at 22:39
  • And this brings me to my final question :-). I sent 50 ordered messages (orderId=1,..,50) and each message has a 20% chance to fail during processing. I set to my stream MAX_POLL_RECORDS_CONFIG=1. After a while all msgs are received in the destination topic as expected, which is great! But, I see although poll size is 1, the commit is not done on a per message basis. I have updated my post with what I see in the logs. Could you explain when and how commits are done? Any resource? Thanks again! – Vassilis Jun 21 '18 at 14:19
  • 2
    Commits are configures based on wall-clock-time via parameter `commit.interval.ms`. If you want a commit per message, setting `max.poll.records=1` is correct but not sufficient. You would also need to manually request a commit for each record (you can insert a `transformValues` step that only forward the data and request the commit). Cf. https://stackoverflow.com/questions/43416178/how-to-commit-manually-with-kafka-stream – Matthias J. Sax Jun 21 '18 at 17:24
  • @Vassilis Did you manage to solve your problem? Could you please post your answer and mark it as correct? – kkflf Jul 13 '18 at 11:02

1 Answers1

1

The key points to have option 2 (Streams Transactions) working are:

  • Assign a Thread.UncaughtExceptionHandler() so that you start a new StreamThread in case of any uncaught exception (by default the StreamThread dies - see code snippet that follows). This can even happen if the production to Kafka broker fails, it does not have to be related to your business logic code in the stream.
  • Consider setting a policy for handling de-serailization of messages (when you consume). Check DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG (javadoc). For example, should you ignore and consume next message or stop consuming from the relevant Kafka partition.
  • In the case of Streams, even if you set MAX_POLL_RECORDS_CONFIG=1 (one record per poll/batch), still consumed offsets and produced messages are not committed per message. This case leads to cases as the one described in the question (see "Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?").
  • Kafka transactions simply do not work on Windows yet. The fix will be delivered in Kafka 1.1.1 (https://issues.apache.org/jira/browse/KAFKA-6052).
  • Consider checking how you handle serialisation exceptions (or in general exceptions during production) (here and here)

    @Configuration
    @EnableKafkaStreams
    public class KafkaStreamsConfiguration {
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public StreamsConfig  kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla");
            props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
            // this should be enough to enable transactions
            props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
            return new StreamsConfig(props);
        }
    }
    
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
    public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) 
    {
        StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
        streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
                streamsBuilderFactoryBean.stop();
                log.debug("creating and starting a new StreamsThread ..");
                streamsBuilderFactoryBean.start();
            }
        });
        return streamsBuilderFactoryBean;
    }
    
Vassilis
  • 914
  • 8
  • 23