0

I have been trying to send messages to AWS MSK using python confluent-kafka library. I want to ensure single delivery of each message which is why I am using transaction-based producer. I am currently sending 500k messages per transaction.

The sending part of the transaction is working fine and giving us the required throughput, however, when I commit the transactions, some of the transaction are randomly timeout.

In normal flow, when the issue does not occur, the commit transaction part takes no time (few seconds). However, I have added a timeout of 10 minutes on the commit transactions and still some of them timeout.

Here is the code that I am using:-

connection_config={
"bootstrap.servers": server-url,
"security.protocol": "SASL_SSL",
"sasl.username": "test",
"sasl.password": "test",
"sasl.mechanism": "SCRAM-SHA-512",
"enable.idempotence": "True",
"transaction.timeout.ms": 1200000,
"acks": "all",
"queue.buffering.max.messages": 200,
"retries": 50
}
p = Producer(connection_config)
p.init_transactions()
p.begin_transaction()
logging.info("Connection successful, writing messages..")
  for index, record in enumerate(data):
      try:
          p.produce(topic_name, json.dumps(record).encode('utf-8'), callback=receipt)
          p.poll(0)
      except BufferError as e:
          p.flush()
          p.produce(topic_name, json.dumps(record).encode('utf-8'), callback=receipt)
logging.info("Flushing remaining messages to kafka ")
p.flush()
logging.info(f"Sending complete for producer,commiting transaction")
p.commit_transaction(int(producer_timeout))

Here is the configuration that I am using for MSK (kafka):-

auto.create.topics.enable=true
default.replication.factor=2
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=50
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.max.timeout.ms=1200000
num.network.threads=10

Error on timeout:-

cimpl.KafkaException: KafkaError{code=_TIMED_OUT,val=-185,str="Transactional API operation (commit_transaction) timed out"}

I have tried looking at the server logs and could not find anything relevant to why this is happening.Can someone please help in debugging this issue.Thanks a lot.

I have tried to decrease the number of messages per transaction and can see that the failure rate improves if messages are less however how much I have found on net is the more the messages per transaction, it is better.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • No, half a million events per transaction is not better. What if event 10 out of that entire batch fails? Then you rollback 499990 events? – OneCricketeer May 05 '23 at 13:35

1 Answers1

0

Records are sent faster than they can delivered to the broker. Please adjust below producer side properties :

buffer.memory 
max.block.size 
batch.size 
linger.ms