Trying to do a simple 'enum topics' as such:
private static Collection<TopicListing> getTopicListing(boolean isInternal)
throws InterruptedException, ExecutionException {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put("client.id","java-admin-client");
try (AdminClient client = AdminClient.create(properties)) {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(isInternal);
return client.listTopics(options).listings().get();
}
}
kafka brokers are running on MSK (AWS managed kafka). The consumer has to run in a k8-pod for various security reasons. I've tried adding memory up to 1.5Gi (which seems crazy for a pod that does this one thing).
Error:
"level":"INFO","logger_name":"o.a.kafka.common.utils.AppInfoParser","thread_name":"kafka-admin-client-thread | java-admin-client","message":"App info kafka.admin.client for java-admin-client unregistered"}
"level":"INFO","logger_name":"o.a.k.c.a.internals.AdminMetadataManager","thread_name":"kafka-admin-client-thread | java-admin-client","message":"[AdminClient clientId=java-admin-client] Metadata update failed","stack_trace":"org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata\n"}
"level":"INFO","logger_name":"o.a.kafka.clients.admin.KafkaAdminClient","thread_name":"kafka-admin-client-thread | java-admin-client","message":"[AdminClient clientId=java-admin-client] Timed out 2 remaining operation(s) during close."}
"level":"INFO","logger_name":"org.apache.kafka.common.metrics.Metrics","thread_name":"kafka-admin-client-thread | java-admin-client","message":"Metrics scheduler closed"}
"level":"INFO","logger_name":"org.apache.kafka.common.metrics.Metrics","thread_name":"kafka-admin-client-thread | java-admin-client","message":"Closing reporter org.apache.kafka.common.metrics.JmxReporter"}
"level":"INFO","logger_name":"org.apache.kafka.common.metrics.Metrics","thread_name":"kafka-admin-client-thread | java-admin-client","message":"Metrics reporters closed"}
"level":"ERROR","logger_name":"o.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread | java-admin-client",
"message":"Uncaught exception in thread 'kafka-admin-client-thread | java-admin-client':",
"stack_trace":"java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(Unknown Source)
at java.nio.ByteBuffer.allocate(Unknown Source)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)\
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1400)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331)
at java.lang.Thread.run(Unknown Source)"}