We have been testing kafka (cloud-based) to implement a data transmission system of many producers (approximately 27000 linux machines), a consumer (spring kafka listener) and a topic with 10 partitions, the problem is that when 9500 producers transmit simultaneously the cpu consumption of all nodes shoots up to 100% and the cluster goes down and stops responding. This kafka designed for this type of architecture or should you look for other options.
Here are my settings:
Kafka Cluster: 4 nodes cloud karafka based(4GB ram + 900GB Disk ) per node
The producer: kafka-clients-1.1.1.jar + JDK 1.7
Properties config = new Properties();
config.put("bootstrap.servers", "***");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "SCRAM-SHA-256");
config.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
config.put("delivery.timeout.ms", 0);
config.put("transactional.id", UUID.randomUUID().toString());
config.put("enable.idempotence", true);
config.put("compression.type", "gzip");
try ( KafkaProducer<String, String> producer = new KafkaProducer<>(config)) {
String dataJson = "700 bytes json String";
ProducerRecord<String, String> data = new ProducerRecord<>("test-topic", dataJson);
producer.initTransactions();
try {
log.info("behin " + data);
producer.beginTransaction();
producer.send(data);
producer.commitTransaction();
log.info("commit " + data);
config = null;
producer.close();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
log.error("kafka exception"+e);
producer.close();
} catch (KafkaException e) {
log.error("kafka exception"+e);
log.info("ABORT");
producer.abortTransaction();
}
} finally {
log.info("finally");
}
Spring boot consumer config:
spring:
datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: oracle.jdbc.OracleDriver
url: jdbc:oracle:thin:@x.x.x.251:1521/xe
username: username
password: password
hikari:
maximum-pool-size: 12
pool-name: test-pool
jpa:
database-platform: org.hibernate.dialect.Oracle10gDialect
show-sql: true
hibernate:
ddl-auto: none
properties:
hibernate:
default_schema: hr
type: trace
format_sql: trace
kafka:
bootstrap-servers: ***********
consumer:
group-id: group-id
topic: test-topic
properties:
security:
protocol: SASL_SSL
sasl:
mechanism: SCRAM-SHA-256
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
jass:
enabled: false
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: io.confluent.kafka.serializers.KafkaJsonSerializer
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void processMessage(String dataJson, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
try {
log.info("json: "+dataJson);
service.save(dataJson);
} catch (SQLException | JsonProcessingException ex) {
service.trackError(dataJson, ex.getLocalizedMessage());
}
}
any help would be greatly appreciated. Thank you