1

I have the following Kafka producer:

package dathanb;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class Producer {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);

        try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
            System.out.println(kafkaProducer.partitionsFor("kafka-test"));
            for (int i = 0; i < 1000; i++) {
                System.out.println(i);
                var metadataFuture = kafkaProducer.send(new ProducerRecord<>("kafka-test", 0, null, "test message - " + i), callback());
                System.out.println(metadataFuture.get().partition());
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static Callback callback() {
        return (metadata, exception) -> {
            System.out.println(metadata);
            System.out.println(exception);
        };
    }
}

Line 20 System.out.println(kafkaProducer.partitionsFor("kafka-test")); prints out what looks like the right partition config: [Partition(topic = kafka-test, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])], but line 24 never prints anything, and callback is never called -- the application appears to hang indefinitely. I don't even get a timeout, which I would expect.

I'm running Kafka 2.1.0 on Scala 2.12 (Dockerfile), and I'm using the kafka-clients 2.1.0 library. I'm running Kafka dockerized using Docker Compose. Here's my docker-compose.yml:

version: "2"
services:
  kafkaserver:
    image: "kafka"
    container_name: kafka
    hostname: kafkaserver
    networks:
      - kafkanet
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      ADVERTISED_HOST: "kafkaserver"
      ADVERTISED_PORT: "9092"
networks:
  kafkanet:
    driver: bridge

The repo is hosted here.

What would cause this behavior?

EDIT:

After adding log4j and the slf4j-log4j12 bridge, I now see the following error in my logs, over and over and over again:

    [2018-12-17 17:39:44,885] ERROR [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
    java.lang.IllegalStateException: No entry found for connection 0
        at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
        at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921)
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
        at java.base/java.lang.Thread.run(Thread.java:834)
Dathan
  • 7,266
  • 3
  • 27
  • 46
  • 1
    Well, for starters, you'll want to expose `advertised.listeners`. Please read https://rmoff.net/2018/08/02/kafka-listeners-explained/ – OneCricketeer Dec 18 '18 at 03:07
  • Yup. Also, there is no reason to fork the spotify images. The other kafka images on DockerHub with more downloads are being updated regularly – OneCricketeer Dec 18 '18 at 05:58

0 Answers0