7

I am testing Apache Kafka Producer with native java implementation against Python's confluent-kafka to see which has the maximum throughput.

I am deploying a Kafka cluster with 3 Kafka brokers and 3 zookeeper instances using docker-compose. My docker compose file: https://paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw

It's a very simple code with mostly default options for Python confluent-kafka and some config changes in java producer to match that of confluent-kafka.

Python Code:

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
    'default.topic.config': {'acks': "all"}})

ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'

def f():
    import time
    start = time.time()
    for i in xrange(1000000):
        try:
            producer.produce('test-topic', ss)
        except Exception:
            producer.poll(1)
            try:
                producer.produce('test-topic', ss)
            except Exception:
                producer.flush(30)
                producer.produce('test-topic', ss)
        producer.poll(0)
    producer.flush(30)
    print(time.time() - start)


if __name__ == '__main__':
    f()

Java implementation. Configuration same as config in librdkafka. Changed the linger.ms and callback as suggested by Edenhill.

package com.amit.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.nio.charset.Charset;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaProducerExampleAsync {

    private final static String TOPIC = "test-topic";
    private final static String BOOTSTRAP_SERVERS = "kafka-1:19092,kafka-2:29092,kafka-3:39092";

    private static Producer<String, String> createProducer() {
        int bufferMemory = 67108864;
        int batchSizeBytes = 1000000;
        String acks = "all";

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSizeBytes);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1000000);
        props.put(ProducerConfig.ACKS_CONFIG, acks);

        return new KafkaProducer<>(props);
    }

    static void runProducer(final int sendMessageCount) throws InterruptedException {
        final Producer<String, String> producer = createProducer();
        final String msg = "0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357";

        final ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, msg);
        final long[] new_time = new long[1];

        try {
            for (long index = 0; index < sendMessageCount; index++) {
                producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        // This if-else is to only start timing this when first message reach kafka
                        if(e != null) {
                           e.printStackTrace();
                        } else {
                            if (new_time[0] == 0) {
                                new_time[0] = System.currentTimeMillis();
                            }
                        }
                    }
                });
            }
        } finally {
            // producer.flush();
            producer.close();
            System.out.printf("Total time %d ms\n", System.currentTimeMillis() - new_time[0]);
        }
    }

    public static void main(String... args) throws Exception {
        if (args.length == 0) {
            runProducer(1000000);
        } else {
            runProducer(Integer.parseInt(args[0]));
        }
    }
}

Benchmark results(Edited after making changes recommended by Edenhill)

Acks = 0, Messages: 1000000

Java: 12.066

Python: 9.608 seconds

Acks: all, Messages: 1000000

Java: 45.763 11.917 seconds

Python: 14.3029 seconds


Java implementation is performing same as Python implementation even after making all the changes that I could think of and the ones suggested by Edenhill in the comment below.

There are various benchmarks about the performance of Kafka in Python but I couldn't find any comparing librdkafka or python Kafka against Apache Kafka.

I have two questions:

  1. Is this test enough to come to the conclusion that with default config's and message of size 1Kb librdkafka is faster?

  2. Does anyone have experience or a source(blog, doc etc) benchmarking librdkafka against confluent-kafka?

Amit Tripathi
  • 7,003
  • 6
  • 32
  • 58
  • 2
    librdkafka is probably faster than the Java counterpart, but the Python client is not due to the large overhead of Python object creation. You will probably want to set "linger.ms" on both clients to something like 50 ms, and start measuring from when you receive the first message delivery acknowledgement / delivery report, to not include startup costs in your measurements. – Edenhill Mar 05 '19 at 21:20
  • What would be the equivalent of queue.buffering.mx.messages in Kafka producer API? – JR ibkr Mar 06 '19 at 15:09
  • I'm not sure there is a queue limit in the Java Producer. – Edenhill Mar 08 '19 at 09:08
  • I also couldn't find queue.buffering.mx.messages option in Java producer api. It used to be there at least till v0.8 https://kafka.apache.org/08/documentation.html – Amit Tripathi Mar 09 '19 at 19:37
  • @Edenhill I made the changes you suggested and saw a big speed bump in case of Java with acks='all' case but it's still not faster than Python producer by much. As I increased message count Python performed better ¯\_(ツ)_/¯. I have updated the question – Amit Tripathi Mar 10 '19 at 16:50

1 Answers1

3

Python client uses librdkakfa which overrides some of the default configuration of Kafka.

Paramter = Kafka default
batch.size = 16384
max.in.flight.requests.per.connection = 5 (librdkafka's default is 1000000)

message.max.bytes in librdkafka may be equivalent to max.request.size.

I think there is no equivalent of librdKafka's queue.buffering.max.messages in Kafka's producer API. If you find something then correct me.

Also, remove buffer.memory parameter from Java program.

https://kafka.apache.org/documentation/#producerconfigs https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Next thing is Java takes some time to load classes. So you need to increase the number of messages your producers producer. It would be great if it takes at-least 20-30 minutes to produce all messages. Then you can compare Java client with Python client.

I like the idea of comparison between python and java client. Keep posting your results on stackoverflow.

JR ibkr
  • 869
  • 7
  • 24
  • It's same ACKS_CONFIG= acks which i am setting to 0 or all, I think `batch.size` is same as `batch.num.messages` in librdkafka which I am setting to `batchSize = 10000` and `props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1000000)` this is for max inflight. It's same is librdkafka config https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md – Amit Tripathi Mar 06 '19 at 13:00
  • I think there is one issue here `message.max.bytes` in librdkafka is `BATCH_SIZE_CONFIG ` in apache kafka. I was using this as batch.num.messages parameter. BATCH_SIZE_CONFIG controls the batch size in bytes not number. Will update questions with new results – Amit Tripathi Mar 06 '19 at 13:16