0

We have been testing kafka (cloud-based) to implement a data transmission system of many producers (approximately 27000 linux machines), a consumer (spring kafka listener) and a topic with 10 partitions, the problem is that when 9500 producers transmit simultaneously the cpu consumption of all nodes shoots up to 100% and the cluster goes down and stops responding. This kafka designed for this type of architecture or should you look for other options.

Here are my settings:

Kafka Cluster: 4 nodes cloud karafka based(4GB ram + 900GB Disk ) per node

The producer: kafka-clients-1.1.1.jar + JDK 1.7

            Properties config = new Properties();
            config.put("bootstrap.servers", "***");
            config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            config.put("security.protocol", "SASL_SSL");
            config.put("sasl.mechanism", "SCRAM-SHA-256");
            config.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");        
            config.put("delivery.timeout.ms", 0);        
            config.put("transactional.id", UUID.randomUUID().toString());
            config.put("enable.idempotence", true);
            config.put("compression.type", "gzip");
    
            try ( KafkaProducer<String, String> producer = new KafkaProducer<>(config)) {
                String dataJson = "700 bytes json String";
                ProducerRecord<String, String> data = new ProducerRecord<>("test-topic", dataJson);
                
                producer.initTransactions();
                try {
                    log.info("behin " + data);
                    producer.beginTransaction();
                    producer.send(data);
                    producer.commitTransaction();
                    log.info("commit " + data);
                    config = null;
                    producer.close();
                } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                    log.error("kafka exception"+e);
                    producer.close();
                } catch (KafkaException e) {
                    log.error("kafka exception"+e);
                    log.info("ABORT");
                    producer.abortTransaction();
                }
            } finally {
                 log.info("finally");
            }

Spring boot consumer config:

spring:
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    driver-class-name: oracle.jdbc.OracleDriver
    url: jdbc:oracle:thin:@x.x.x.251:1521/xe
    username: username
    password: password
    hikari:      
      maximum-pool-size: 12      
      pool-name: test-pool
  jpa:
    database-platform: org.hibernate.dialect.Oracle10gDialect
    show-sql: true
    hibernate:
      ddl-auto: none    
    properties:
      hibernate:
        default_schema: hr
        type: trace
        format_sql: trace
  kafka:
    bootstrap-servers: ***********
    consumer:
      group-id: group-id
      topic: test-topic
    properties:
      security:
        protocol: SASL_SSL
      sasl:
        mechanism: SCRAM-SHA-256
        jaas:
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
      jass:
        enabled: false
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: io.confluent.kafka.serializers.KafkaJsonSerializer

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void processMessage(String dataJson, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
            @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

        try {            
            log.info("json: "+dataJson);
            service.save(dataJson);
        } catch (SQLException | JsonProcessingException ex) {
           
            service.trackError(dataJson, ex.getLocalizedMessage());
        }

    }

any help would be greatly appreciated. Thank you

  • Kafka should be able to handle that. Not being an expert I can't tell you what exactly to do but my assumption would be that your setup is just too small for this kind of load. – Thomas Jul 13 '21 at 06:55
  • Which nodes go to 100% cpu? The producers or consumers? Also, with 27k producers, you're well into the territory where you need professional support from Confluent. – Software Engineer Jul 13 '21 at 07:02
  • Hello @Software Engineer, all nodes.Apparently the problem is the number of producers, previously we tested with 20 producers sending large volumes of data (100k messages per second) and the cpu consumption never exceeded 10%. Any idea? – GreenLemonade Jul 13 '21 at 08:26
  • I'm not clear - so the producers, and the brokers, and the consumers all go to 100% CPU? Or just the brokers? or just the producers? – Robin Moffatt Jul 13 '21 at 09:08
  • Only the brokers – GreenLemonade Jul 13 '21 at 13:04

0 Answers0