2

1. In my application which sends a data through TCP connection (Kafka Producer), I observed drastic performance drop when the message size gets larger from 1MB to 100MB. (140 MB/sec --> 25 MB/sec) (batch size = 1)

I profiled the producer process and found one suspicious point: a method 'copyFromArray' in Bits.java consumes most of the time. (The codes are as follows.)

static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L;

static void copyFromArray(Object src, long srcBaseOffset, long srcPos,
                          long dstAddr, long length)
{
    long offset = srcBaseOffset + srcPos;
    while (length > 0) {
        long size = (length > UNSAFE_COPY_THRESHOLD) ? UNSAFE_COPY_THRESHOLD : length;
        unsafe.copyMemory(src, offset, null, dstAddr, size);
        length -= size;
        offset += size;
        dstAddr += size;
    }
}

Reference: http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/java/nio/Bits.java

2. Interestingly this problem occurs only when I use the producer client (java implementation) but does not occur when I use the one (scala implementation), which I cannot understand.

Where should I start to find what the problem is here?

syko
  • 3,477
  • 5
  • 28
  • 51
  • `mb` = milli-bits, `MB` = Mega-Bytes. It's not unusual to be spending most of the time copying data if you have very large messages. I would have thought Scala calls the same code as the Java client. – Peter Lawrey Mar 28 '16 at 12:01
  • @PeterLawrey (I fixed mb --> MB, thanks for this.) Scala client calls the exactly same method as Java client does according to the profiling results. – syko Mar 28 '16 at 12:04
  • So you are right that it should perform exactly the same way, as it is running the same code. – Peter Lawrey Mar 28 '16 at 12:05
  • If you're just copying between two arrays of the same type, wouldn't it be faster to copy in blocks of UNSAFE_COPY_THRESHOLD? – cup Mar 28 '16 at 12:07
  • The problem might be the message size itself. TCP packet sizes apparently vary based on various factors and your producer might have to spend a lot of time chopping your message down into a lot of smaller packets in order to send the data safely. http://stackoverflow.com/questions/2613734/maximum-packet-size-for-a-tcp-connection – NAMS Mar 28 '16 at 15:19
  • @NAMS, I agree that there must be overheads in chopping & copying data in TCP. But why it does not happen in scala code? – syko Mar 28 '16 at 16:20
  • @syko I'm not familiar enough with scala to be able to answer that. I would check to make sure that the scala implementation is sending the message properly first (not truncated, etc.), and if it is, then it must be something in the underlying implementation that allows scala to perform more optimally in this scenario. I can't imagine what it would be, though. – NAMS Mar 28 '16 at 16:46

1 Answers1

1

Kafka's optimal message size is around 1k. If your message size is larger than 10M, you start to suffer performance problem. In your case, the message size is around 100MB. That's definitely a no no.

You have to ask yourself whether sending such big message is necessary. Kafka is a event pub-sub system, not a FTP server. If you need to send large file, you can put the file in a shared location and just send the url as message through Kafka. If this does not work, another workaround is to code your producer to break large messages into multiple pieces with the same key. This way you guarantee the messages with the same key will end up on the same partition. You can assemble the messages back at the consumer side. Also using compression will reduce the size of your message to improve performance.

In short, you should avoid sending large messages (>10M) through Kafka.

Lan
  • 6,470
  • 3
  • 26
  • 37
  • I am sure that Kafka is not for transferring large sized messages. I am not blaming on Kafka for this. I just want to know why scala and java code behaves differently. You can ask yourself whether this issue has any relationship with Apache Kafka's architecture, which I don't fully agree. – syko Mar 29 '16 at 04:29
  • @syko Sorry I did not understand the purpose of your question. As far as I know, the scala producer implementation is the old API and the Java producer is the new one. I have no explanation why they perform differently as the reason could be anything -- bugs, configuration default changes between releases, your own code, etc. The statement that "Kafka should avoid large size message" stays true. – Lan Mar 29 '16 at 14:32