2

I've read many articles where there are many different configurations to achieve exactly once processing.

Here is my producer config:

final Map<String, Object> props = Maps.newConcurrentMap();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);


props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all"); 
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");

Here is my consumer config:

final Map<String, Object> props = Maps.newHashMap();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

I read this [sample scenarios][1]

And I try to follow but i'm getting some issues:

Here is my producer code:

    @Override
public Mono<SenderResult<Void>> buy(Message msg) {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    return kafkaProducerTemplate.transactionManager().begin().then(kafkaProducerTemplate.send(mytopic, msg));

}

My Consumer code:

@Override
public void run(ApplicationArguments arg0) throws Exception {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = kafkaConfig.getKafkaConsumerTemplate(mytopic, Message.class);

    final Flux<ConsumerRecord<String, Message>> flux = kafkaConsumerTemplate.receiveExactlyOnce(kafkaProducerTemplate.transactionManager())
            .concatMap(receiverRecordFlux -> receiverRecordFlux );

    flux.subscribe(record -> {
        final Message message = record.value();

        System.out.printf("received message: timestamp=%s key=%d value=%s\n",
                dateFormat.format(new Date(record.timestamp())),
                record.key(),
                message);
 transactionService.processAndSendToNextTopic(message)
                .doOnSuccess(aVoid -> kafkaProducerTemplate.transactionManager().commit())
                .subscribe();

    });
}

I'm always getting following error when trying to produce and consume message:

Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
user1955934
  • 3,185
  • 5
  • 42
  • 68

1 Answers1

1

See the javadocs for receiveExactlyOnce

/**
 * Returns a {@link Flux} of consumer record batches that may be used for exactly once
 * delivery semantics. A new transaction is started for each inner Flux and it is the
 * responsibility of the consuming application to commit or abort the transaction
 * using {@link TransactionManager#commit()} or {@link TransactionManager#abort()}
 * after processing the Flux. 

begin() has already been called so you don't need to call it.

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager) {
    this.ackMode = AckMode.EXACTLY_ONCE;
    Flux<ConsumerRecords<K, V>> flux = withDoOnRequest(createConsumerFlux());
    return  flux.map(consumerRecords -> transactionManager.begin()
                             .then(Mono.fromCallable(() -> awaitingTransaction.getAndSet(true)))
                             .thenMany(transactionalRecords(transactionManager, consumerRecords)))
                             .publishOn(transactionManager.scheduler());
}
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • thanks! do you see any issue with the consumer/producer config? also, do i need to close producer after sending? – user1955934 Aug 01 '19 at 15:16
  • I am not familiar with the reactive client (or reactor at all, really) but according to the javadocs, you won't get the next flux of consumer records until the current one is complete, so reusing the producer should be fine. One thing I do notice is that if you have multiple partitions and multiple instances, proper zombie fencing won't work because, for that to work properly, the transactional.id has to be based on the `topic/partition/group.id`. So, if a partition moves to another instance after a rebalance the new consumer uses the same transactional id. You'd need a consumer per partition. – Gary Russell Aug 01 '19 at 16:07
  • From what i read, “We solve the problem of zombie instances by requiring that each transactional producer be assigned a unique identifier called the transactional.id. This is used to identify the same producer instance across process restarts.”. I thought i only need to set a unique id for each instance that runs a producer – user1955934 Aug 01 '19 at 22:55
  • That is true for producer-only transactions, but not for exactly once for read-process-write. I'll look for the article that explains that tomorrow. – Gary Russell Aug 01 '19 at 23:09
  • [Here is one discussion](https://stackoverflow.com/questions/50335227/how-to-pick-a-kafka-transaction-id). But there was a Confluent article that I can't find right now. – Gary Russell Aug 01 '19 at 23:36
  • [Here is another explanation](https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html). This situation is handled by the `KafkaListenerContainer` for the non-reactive consumer but is a limitation of the reactive client. – Gary Russell Aug 01 '19 at 23:50
  • thx for the references Gary. How do i obtain information on partition number that should be set as transactional id which is used to initialize the producer. Like, how does a producer know which topic partition it will be sending to? – user1955934 Aug 02 '19 at 02:08
  • It can't be done with the reactive client; the transactional.id is set on the producer and there is only one producer used for the whole Flux of records returned from the poll. With the listener container (for exactly once) we start a new transaction for each record (with a producer with the proper `transactional.id` retrieved from a cache). – Gary Russell Aug 02 '19 at 12:50
  • for my use case: write-read-process with reactive clients, there is no way to have exactly once processing guarantee? – user1955934 Aug 03 '19 at 00:32
  • Unless I am missing something, I believe it will only work if you have one instance of the application, or if you manually assign partitions to instances (rather than using group management, and each instance needs a different `transactional.id`), so that the same `transactional.id` will always be used for each partition. – Gary Russell Aug 03 '19 at 14:49
  • ok so after more reading, it seems that i have misinterpreted things. if i am only sending 1 message at a time to a topic, surely i dont need to use transaction? no need for transactional id and just simply set enable.idempotence to true? – user1955934 Aug 04 '19 at 15:05
  • I am not sure what you mean by `write-read-process`. Yes, an idempotent producer doesn't "need" transactions. But, you are using `receiveExactlyOnce` on the consumer side; that is intended for `read-process-write` and requires transactions. If you are not using that pattern then just use a simple consumer to receive the idempotently written records. But you will get at least once semantics there. – Gary Russell Aug 04 '19 at 15:51
  • Gary doesnt idempotent producer prevent duplicate sending? i guess consumer failure may lead to it consuming same record more than once? how does transaction help in this case? by write-process-read i really meant the communication is initiated from producer. – user1955934 Aug 04 '19 at 22:36
  • I think you are reading too much into "exactly once". Yes, idempotent producer means records will be placed in the log exactly once, so that's simple. The problem is on the consumer side. Exactly once guarantees there are limited to Kafka only. `read-process-write`. Means that for any consumed message will either have its offset committed (and any records it produces) successfully as a unit, or not. There is no guarantee that "process" will be called exactly once. So if you, e.g. update a DB, you need your own idempotency guarantees. It sounds like your producer/consumer are different apps. – Gary Russell Aug 04 '19 at 23:11
  • If that's the case then your producer app doesn't need transactions. But they add no value on the consumer side; you need your own idempotent logic to deal with possible duplicate deliveries. There is no value in using `receiveExactlyOnce` unless that app also produces records; it does not guarantee any exactly once semantics for a `read-process` app, only `read-process-write`, and even then, only for Kafka interactions. – Gary Russell Aug 04 '19 at 23:19
  • so for exactly once processing, should it be enough if producer is idempotent, and consumer manually commits after processing is done and new message is sent. By the way do you think for a producer to be idempotent i need to set transactional Id? Or, enable.idempotence set to true should be enough? – user1955934 Aug 05 '19 at 02:09
  • Your question is not clear `"and new message is sent."` by who? This commentary is too long; the admins don't like it. If you still have questions I suggest you start a new one and exactly explain the use case you are trying to solve. – Gary Russell Aug 05 '19 at 13:34
  • Hi Gary I have updated my question details. Basically i am now using read-process-write as well. I have used above example and still getting Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION . even though im only testing using a single partitioned topic and in a happy case – user1955934 Aug 06 '19 at 00:49
  • As I said, you should ask a new question. I already answered your original question here. – Gary Russell Aug 06 '19 at 01:38
  • Alright i reverted the edit back to original question, i've created a new one here: https://stackoverflow.com/questions/57367942/reactive-kafka-exactly-once-processing-with-transaction . thx for your time – user1955934 Aug 06 '19 at 01:56