1

I am using a stateless processor using Kafka streams 1.0 with kafka broker 1.0.1

The problem is, the CustomProcessor get closed every few seconds, which resulted in rebalance signal, I am using the following configs:

session.timeout.ms=15000

heartbeat.interval.ms=3000 // set it to 1/3 session.timeout

max.poll.interval.ms=Integer.MAX_VALUE // make it that large as I am doing a intensive computational operations that might take up to 10 mins processing 1 kafka message (NLP operations)

max.poll.records=1

despite this configuration and my understanding of how kafka timeout configurations work, I see the consumer rebalancing every few seconds.

I already went through the below article and other stackoverflow questions. about how to tune the long time operations and avoid very long session timeout that will make failure detection so late, however I still see unexpected behavior, unless I misunderstand something.

KIP-62

Diff between session.timeout.ms and max.poll.interval

Kafka kstreams processing timeout

For the consumer environment setup, I have 8 machines each 16 code, and consuming from 1 topic with 100 partitions, I am following what practice this confluent doc here recommends.

Any pointers?

Karim Tawfik
  • 1,286
  • 1
  • 11
  • 21
  • What is the root cause for the rebalance? Can you clarify the dependency between closing a processor and the rebalance? It seems, one happens first (root cause) triggering the other one. Did you check the logs? – Matthias J. Sax Apr 20 '18 at 13:33
  • When I see `processor.close()`, I immediately see the few seconds pause (which is roughly the `session.timeout.ms duration`) and I see rebalance happens, application logs are fine, no exceptions or anything that will exit the process() method uncleanly – Karim Tawfik Apr 20 '18 at 14:22
  • From my understanding @MatthiasJ.Sax that the close() should not be called in the normal flow of the application, in other words, the .close() is not called with every message for example like the init() method, is that correct? – Karim Tawfik Apr 20 '18 at 14:23
  • Yes, init() and close() should only be called when a partitions is assigned/revoked. close() might also be called if an exception is thrown and a task fails. Thus, it's puzzling to me what actually happens for the case you describe. If `session.timeout.ms` hits, there should be a log entry on the broker that host the group coordinator of the consumer group. Maybe DEBUG logs show more information? – Matthias J. Sax Apr 20 '18 at 16:42
  • Thanks @MatthiasJ.Sax I will try to check the broker logs and enable the DEBUG logs and will post the updates. – Karim Tawfik Apr 22 '18 at 10:31
  • I enabled debug log level on broker, and the logs are very very noisy, do you know which log I should expect or grep for? – Karim Tawfik Apr 23 '18 at 15:01
  • This is what I got from more looking at the logs `DEBUG [SocketServer brokerId=1] Connection with /X.X.X.X disconnected (org.apache.kafka.common.network.Selector) java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235) at ` – Karim Tawfik Apr 23 '18 at 17:03
  • Here is the rest of the above log `org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:545) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:483) at org.apache.kafka.common.network.Selector.poll(Selector.java:412) at kafka.network.Processor.poll(SocketServer.scala:551) at kafka.network.Processor.run(SocketServer.scala:468) at java.lang.Thread.run(Thread.java:748)` – Karim Tawfik Apr 23 '18 at 17:04
  • A disconnect should not close the processor -- really unclear to me what's happening there... – Matthias J. Sax Apr 24 '18 at 17:16

1 Answers1

2

I figured it out. after lots of debugging and enable verbose logging for both kafka streams client and the broker, it turned out to 2 things:

  1. There is a critical bug in streams 1.0.0 (HERE), so I upgraded my client version from 1.0.0 to 1.0.1
  2. I update the value of the consumer property default.deserialization.exception.handler from org.apache.kafka.streams.errors.LogAndFailExceptionHandler to org.apache.kafka.streams.errors.LogAndContinueExceptionHandler.

After the above 2 changes, everything went so perfect with no restarts, I am using grafana to monitor the restarts, and for the past 48 hours, there is no single restart happened.

I might do more troubleshooting to make sure which of the 2 items above make the real fix, but I am on a hurry to deploy to production, so if anybody is intrested to start from there, go ahead, else, once I got time will do the further analysis and update the answer!

So happy to get this fixed!!!

Karim Tawfik
  • 1,286
  • 1
  • 11
  • 21