looking for help as I have been stuck for a week now. I have a Kafka (0.8.2.1_11) and Zookeeper instance running inside of docker containers, and I am trying to get a Java app to write data to Kafka. I see that my topic is created, but no data is being saved.
My Java app is pretty straightforward, for loop that writes the iteration number. I also included a snippet of my docker-compose.yml file for the two services.
I start up my Kafka and Zookeeper by issuing a docker-compose -f docker-compose.yml up -d
command. Then I run my Java file through intelliJ
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class MainFake {
private static final Logger LOGGER = LoggerFactory.getLogger(MainFake.class);
private static Properties kafkaProperties = new Properties();
static {
// set up Kafka
kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
kafkaProperties.put(ProducerConfig.ACKS_CONFIG, "all");
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProperties.put("retries", 0);
kafkaProperties.put("batch.size", 16384);
kafkaProperties.put("linger.ms", 1);
kafkaProperties.put("buffer.memory", 33554432);
}
public static void main(String[] args) {
LOGGER.info("Started fake Main");
KafkaProducer kafkaProducer = new KafkaProducer(kafkaProperties);
try {
for (int i = 0; i < 30; i++) {
LOGGER.info("logging " + i);
String kafkaPayload = String.valueOf(i);
ProducerRecord<String, String> record = new ProducerRecord<>("test", generateKey(kafkaPayload), kafkaPayload);
LOGGER.info("sending...");
kafkaProducer.send(record).get();
LOGGER.info("sent.");
}
} catch (Exception e) {
LOGGER.error(String.valueOf(e));
} finally {
LOGGER.info("closing producer...");
kafkaProducer.close();
LOGGER.info("closed producer.");
}
LOGGER.info("Finished fake Main");
}
private static String generateKey(String payload) {
return String.valueOf(System.nanoTime() + payload.hashCode());
}
}
zk:
image: registry.cloud.cas.org/osd-platform/baseimage-zookeeper:3.4.8_11
restart: always
ports:
- "2181:2181"
networks:
- chad
kafka:
image: registry.cloud.cas.org/osd-platform/kafka:0.8.2.1_11
restart: always
ports:
- "9092:9093"
depends_on:
- zk
environment:
KAFKA_NAME: 1
LOG_LEVEL: INFO
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_LISTENERS: INTERNAL://kafka:9093,OUTSIDE://localhost:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
hostname: ${VM_HOSTNAME}
networks:
chad:
aliases:
- kafka