I have the following Kafka producer:
package dathanb;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class Producer {
public static void main(String[] args){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
System.out.println(kafkaProducer.partitionsFor("kafka-test"));
for (int i = 0; i < 1000; i++) {
System.out.println(i);
var metadataFuture = kafkaProducer.send(new ProducerRecord<>("kafka-test", 0, null, "test message - " + i), callback());
System.out.println(metadataFuture.get().partition());
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static Callback callback() {
return (metadata, exception) -> {
System.out.println(metadata);
System.out.println(exception);
};
}
}
Line 20 System.out.println(kafkaProducer.partitionsFor("kafka-test"));
prints out what looks like the right partition config: [Partition(topic = kafka-test, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])]
, but line 24 never prints anything, and callback
is never called -- the application appears to hang indefinitely. I don't even get a timeout, which I would expect.
I'm running Kafka 2.1.0 on Scala 2.12 (Dockerfile), and I'm using the kafka-clients 2.1.0 library. I'm running Kafka dockerized using Docker Compose. Here's my docker-compose.yml:
version: "2"
services:
kafkaserver:
image: "kafka"
container_name: kafka
hostname: kafkaserver
networks:
- kafkanet
ports:
- "2181:2181"
- "9092:9092"
environment:
ADVERTISED_HOST: "kafkaserver"
ADVERTISED_PORT: "9092"
networks:
kafkanet:
driver: bridge
The repo is hosted here.
What would cause this behavior?
EDIT:
After adding log4j and the slf4j-log4j12 bridge, I now see the following error in my logs, over and over and over again:
[2018-12-17 17:39:44,885] ERROR [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender)
java.lang.IllegalStateException: No entry found for connection 0
at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
at java.base/java.lang.Thread.run(Thread.java:834)