I'm trying to write a large message into kafka (around 15mb) and it doesn't get written, the program finishes as if everything is ok, but there's no message inside the topic.
Small messages do get written.
Here's the code:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
private final static String TOPIC = "rpdc_21596_in2";
private final static String BOOTSTRAP_SERVERS = "host:port";
private static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
props.put("test.whatever", "fdsfdsf");
return new KafkaProducer<>(props);
}
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(TOPIC,
0,
123L,
"fdsfdsdsdssss",
new String(Files.readAllBytes(Paths.get("/Users/user/Desktop/value1.json")))
);
KafkaProducer<String, String> producer = createProducer();
RecordMetadata recordMetadata = producer.send(record).get();
producer.flush();
producer.close();
System.out.println(recordMetadata);
}
}
The topic has been configured to accept big messages, I've been able to write into it with python. Here's the python code:
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['host:port'], max_request_size=20971520, request_timeout_ms=100000)
with open('/Users/user/Desktop/value1.json', 'rb') as f:
lines = f.read()
print(type(lines))
# produce keyed messages to enable hashed partitioning
future = producer.send('rpdc_21596_in2', key=b'foo', value=lines)
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=50)
except KafkaError:
# Decide what to do if produce request failed...
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
producer.flush()
But that java version doesn't work.