I can't seem to get my Java Kafka client to work. Symptoms:
"Discovered coordinator" is seen in logs, then less than one second later, "Marking the coordinator ... dead" is seen. No more output appears after that.
Debugging the code shows that org.apache.kafka.clients.consumer.KafkaConsumer.poll()
never returns. The code is stuck in this do-while loop in the ConsumerNetworkClient
class:
public boolean awaitMetadataUpdate(long timeout) {
long startMs = this.time.milliseconds();
int version = this.metadata.requestUpdate();
do {
this.poll(timeout);
} while(this.metadata.version() == version && this.time.milliseconds() - startMs < timeout);
return this.metadata.version() > version;
}
The logs say:
2019-09-25 15:25:45.268 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = foo
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class com.mycompany.KafkaMessageJsonNodeDeserializer
2019-09-25 15:25:45.312 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.3
2019-09-25 15:25:45.312 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 26ddb9e3197be39a
2019-09-25 15:25:47.700 [pool-2-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator ad0c03f60f39:9092 (id: 2147483647 rack: null) for group foo.
2019-09-25 15:25:47.705 [pool-2-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator ad0c03f60f39:9092 (id: 2147483647 rack: null) dead for group foo
.. if debug was turned on, then logs would also have a message like:
Coordinator discovery failed for group foo, refreshing metadata
More details:
I'm running kafka inside a docker container. When running the console consumer within the docker container, all is well. Messages are received just fine by the console consumer. My app (where the issues occur) is running outside the docker container.
The docker run
command includes -p 2181:2181 -p 9001:9001 -p 9092:9092
.
The stack looks like this when the Kafka client gets stuck in the loop:
awaitMetadataUpdate:134, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
ensureCoordinatorReady:226, AbstractCoordinator (org.apache.kafka.clients.consumer.internals)
ensureCoordinatorReady:203, AbstractCoordinator (org.apache.kafka.clients.consumer.internals)
poll:286, ConsumerCoordinator (org.apache.kafka.clients.consumer.internals)
pollOnce:1078, KafkaConsumer (org.apache.kafka.clients.consumer)
poll:1043, KafkaConsumer (org.apache.kafka.clients.consumer)