8

I have a single node, multi (3) broker Zookeeper / Kafka setup. I am using the Kafka 0.10 Java client.

I wrote following simple remote (on a different Server than Kafka) Producer (in the code I replaced my public IP address with MYIP):

Properties config = new Properties();
try {
    config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094");
    config.put(ProducerConfig.ACKS_CONFIG, "all");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
    producer = new KafkaProducer<String, byte[]>(config);
    Schema.Parser parser = new Schema.Parser();
    schema = parser.parse(GATEWAY_SCHEMA);
    recordInjection = GenericAvroCodecs.toBinary(schema);
    GenericData.Record avroRecord = new GenericData.Record(schema);
    //Filling in avroRecord (code not here)
    byte[] bytes = recordInjection.apply(avroRecord);

    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes));
    RecordMetadata data = future.get();
} catch (Exception e) {
    e.printStackTrace();
}

My server properties for the 3 brokers look like this (in the 3 different server properties files broker.id is 0, 1, 2 and listeners is PLAINTEXT://:9092, PLAINTEXT://:9093, PLAINTEXT://:9094 and host.name is 10.2.0.4, 10.2.0.5, 10.2.0.6). This is the first server properties file:

broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka1-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

When I execute the code, I get following exception:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
    at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212)
    at com.nr.roles.gateway.gw.service(gw.java:126)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119)
    at org.eclipse.jetty.server.Server.handle(Server.java:517)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
    at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0

Does anyone know what I am missing? Any help would be appreciated. Thanks a lot

Armen
  • 81
  • 1
  • 1
  • 5
  • I also tried to do the same as above, but with only one broker (on port 9092). I still get the exact same Exception. I made sure the broker and zookeeper ports on remote machine are open and I can telnet them from the Producer machine. – Armen Jul 08 '16 at 19:55

3 Answers3

4

I encounter the same problems.

You should change your kafka server.properties to specify ip address. eg:

PLAINTEXT://YOUIP:9093

if not, kafka will use hostname, if the producer can not get the host, it can not send message to kafka even if you can telnet them.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
vr3C
  • 1,734
  • 19
  • 16
  • 1
    i have my property set right in my server.properties as, listeners=PLAINTEXT://domain_name:9092,But still getting this exception org.apache.kafka.common.errors.TimeoutException: Batch Expired java.util.concurrent.ExecutionException, while connecting from external server, i have increased request.timeout.ms to a higher values too. – jack AKA karthik Jan 30 '17 at 07:28
1

Port information in your BOOTSTRAP_SERVERS_CONFIG configuration is incorrect (MYIP:9092).

As you've mentioned in server.properties as "PLAINTEXT://:9093, PLAINTEXT://:9093, PLAINTEXT://:9094".

Kamal Chandraprakash
  • 1,872
  • 18
  • 28
  • sorry, i had a typo here, in server.properties it is "PLAINTEXT://:9092, PLAINTEXT://:9093, PLAINTEXT://:9094". So the `BOOTSTRAP_SERVERS_CONFIG` ports are correct. – Armen Jul 08 '16 at 19:42
0

This answer shares some insight. You can increase the request.timeout.ms producer configuration which will allow the client to queue batches for longer before expiring.

You might also want to look into the batch.size and linger.ms configurations and find the optimal that works in your case.

Tanvir
  • 1,453
  • 2
  • 16
  • 32