2

I'm trying to write a large message into kafka (around 15mb) and it doesn't get written, the program finishes as if everything is ok, but there's no message inside the topic.

Small messages do get written.

Here's the code:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Main {
    private final static String TOPIC = "rpdc_21596_in2";
    private final static String BOOTSTRAP_SERVERS = "host:port";

    private static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
        props.put("test.whatever", "fdsfdsf");

        return new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
        ProducerRecord<String, String> record =
                new ProducerRecord<String, String>(TOPIC,
                        0,
                        123L,
                        "fdsfdsdsdssss",
                        new String(Files.readAllBytes(Paths.get("/Users/user/Desktop/value1.json")))
                );
        KafkaProducer<String, String> producer = createProducer();
        RecordMetadata recordMetadata = producer.send(record).get();
        producer.flush();
        producer.close();

        System.out.println(recordMetadata);
    }
}

The topic has been configured to accept big messages, I've been able to write into it with python. Here's the python code:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['host:port'], max_request_size=20971520, request_timeout_ms=100000)

with open('/Users/user/Desktop/value1.json', 'rb') as f:
    lines = f.read()
    print(type(lines))

    # produce keyed messages to enable hashed partitioning
    future = producer.send('rpdc_21596_in2', key=b'foo', value=lines)

    # Block for 'synchronous' sends
    try:
        record_metadata = future.get(timeout=50)
    except KafkaError:
        # Decide what to do if produce request failed...
        pass

    # Successful result returns assigned partition and offset
    print (record_metadata.topic)
    print (record_metadata.partition)
    print (record_metadata.offset)


    producer.flush()

But that java version doesn't work.

pavel_orekhov
  • 1,657
  • 2
  • 15
  • 37
  • What does recordMetadata say, is it null? – Thomas Raffelsieper Jan 12 '22 at 10:17
  • @ThomasRaffelsieper no, it is not null. – pavel_orekhov Jan 12 '22 at 10:19
  • try to check the response: producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // If Exception is null, the record was sent successfully } }); – Thomas Raffelsieper Jan 12 '22 at 10:40
  • 1
    @ThomasRaffelsieper yep it's working. The problem wasn't the code. It was offset explorer. It wasn't showing my data, yet in the properties of the topic I just noticed that the number of messages does increase... Thanks man. – pavel_orekhov Jan 12 '22 at 11:13
  • Kafka is not meant to send files. If this is what you're doing, then using "referential messaging" is frequently recommended. For example, upload the data to HDFS/S3 shared filesystems, then send _that path_ in your events. Any consumer then would then download the file – OneCricketeer Jan 12 '22 at 14:51
  • 1
    @OneCricketeer everybody understands it on our team, idk why they're doing it, i think it's a temporary solution – pavel_orekhov Jan 12 '22 at 15:42

1 Answers1

1

You need to configure your topic appropriately when creating it: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes

$ kafka-topics.sh --create --bootstrap-servers ... --config max.message.bytes=20971520

UPDATE:

maybe add some more properties, I've been pushing big base64 blobs with this:

    // Only one in-flight messages per Kafka broker connection
    // - max.in.flight.requests.per.connection (default 5)
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
    // Set the number of retries - retries
    props.put(ProducerConfig.RETRIES_CONFIG, "3");

    // Request timeout - request.timeout.ms
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");

    // Only retry after one second.
    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
   
    // set max block to one minute by default
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
   
    // set transaction timeout to one minute
    props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
   
    // set delivery timeout to two minutes
    props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

    //time to wait before sending messages out to Kafka, should not be too high
    props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
    // maximum amount of data to be collected before sending the batch, you will always hit that
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");

    //those ones are not neccessary but useful for your usecase
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "myClient");