0

looking for help as I have been stuck for a week now. I have a Kafka (0.8.2.1_11) and Zookeeper instance running inside of docker containers, and I am trying to get a Java app to write data to Kafka. I see that my topic is created, but no data is being saved.

My Java app is pretty straightforward, for loop that writes the iteration number. I also included a snippet of my docker-compose.yml file for the two services.

I start up my Kafka and Zookeeper by issuing a docker-compose -f docker-compose.yml up -d command. Then I run my Java file through intelliJ

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class MainFake {

    private static final Logger LOGGER = LoggerFactory.getLogger(MainFake.class);
    private static Properties kafkaProperties = new Properties();

    static {

        // set up Kafka
        kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        kafkaProperties.put(ProducerConfig.ACKS_CONFIG, "all");
        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        kafkaProperties.put("retries", 0);
        kafkaProperties.put("batch.size", 16384);
        kafkaProperties.put("linger.ms", 1);
        kafkaProperties.put("buffer.memory", 33554432);
    }

    public static void main(String[] args) {
        LOGGER.info("Started fake Main");
        KafkaProducer kafkaProducer = new KafkaProducer(kafkaProperties);
        try {
            for (int i = 0; i < 30; i++) {
                LOGGER.info("logging " + i);
                String kafkaPayload = String.valueOf(i);
                ProducerRecord<String, String> record = new ProducerRecord<>("test", generateKey(kafkaPayload), kafkaPayload);
                LOGGER.info("sending...");
                kafkaProducer.send(record).get();
                LOGGER.info("sent.");
            }
        } catch (Exception e) {
            LOGGER.error(String.valueOf(e));
        } finally {
            LOGGER.info("closing producer...");
            kafkaProducer.close();
            LOGGER.info("closed producer.");
        }
        LOGGER.info("Finished fake Main");
    }

    private static String generateKey(String payload) {
        return String.valueOf(System.nanoTime() + payload.hashCode());
    }

}

zk:
    image: registry.cloud.cas.org/osd-platform/baseimage-zookeeper:3.4.8_11
    restart: always
    ports:
      - "2181:2181"
    networks:
      - chad

  kafka:
    image: registry.cloud.cas.org/osd-platform/kafka:0.8.2.1_11
    restart: always
    ports:
      - "9092:9093"
    depends_on:
      - zk
    environment:
      KAFKA_NAME: 1
      LOG_LEVEL: INFO
      KAFKA_ZOOKEEPER_CONNECT: localhost:2181
      KAFKA_LISTENERS: INTERNAL://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    hostname: ${VM_HOSTNAME}
    networks:
      chad:
        aliases:
          - kafka

0 Answers0