0

we are using Kafka streams to insert into PostgreSQL since the flow is too high a direct insert is being avoided. The consumer seems to be working well but gets stuck occasionally and cant find the root cause for the same .

The consumer has been running for about 6 months and already consumed billions of records. I don't understand why its getting stuck as of late . I don't even know where to start debugging.

Below is the code for processing the records: `private static void readFromTopic(DataSource datasource, ConsumerOptions options) {

    KafkaConsumer<String, String> consumer = KafkaConsumerConfig.createConsumerGroup(options);
    Producer<Long, String> producer = KafkaProducerConfig.createKafkaProducer(options);

    if (options.isReadFromAnOffset()) {
        // if want to assign particular offsets to consume from
        // will work for only a single partition for a consumer
        List<TopicPartition> tpartition = new ArrayList<TopicPartition>();
        tpartition.add(new TopicPartition(options.getTopicName(), options.getPartition()));
        consumer.assign(tpartition);
        consumer.seek(tpartition.get(0), options.getOffset());
    } else {
        // use auto assign partition & offsets
        consumer.subscribe(Arrays.asList(options.getTopicName()));
        log.debug("subscribed to topic {}", options.getTopicName());
    }

    List<Payload> payloads = new ArrayList<>();

    while (true) {

        // timer is the time to wait for messages to be received in the broker
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(50));

        if(records.count() != 0 )
        log.debug("poll size is {}", records.count());
        Set<TopicPartition> partitions = records.partitions();
        // reading normally as per round robin and the last committed offset
        for (ConsumerRecord<String, String> r : records) {

            log.debug(" Parition : {} Offset : {}", r.partition(), r.offset());

            try {

                JSONArray arr = new JSONArray(r.value());
                for (Object o : arr) {
                    Payload p = JsonIterator.deserialize(((JSONObject) o).toString(), Payload.class);
                    payloads.add(p);
                }

                List<Payload> steplist = new ArrayList<>();
                steplist.addAll(payloads);

                // Run a task specified by a Runnable Object asynchronously.
                CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
                    @Override
                    public void run() {

                        try {
                            Connection conn = datasource.getConnection();

                            PgInsert.insertIntoPg(steplist, conn, consumer, r, options.getTopicName(),
                                    options.getErrorTopic(), producer);

                        } catch (Exception e) {

                            log.error("error in processing future {}", e);
                        }

                    }
                }, executorService);

                // used to combine all futures
                allfutures.add(future);

                payloads.clear();

            } catch (Exception e) {

                // pushing into new topic for records which have failed
                log.debug("error in kafka consumer {}", e);
                ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(options.getErrorTopic(),
                        r.offset(), r.value());
                producer.send(record);

            }
        }

        // commiting after every poll
        consumer.commitSync();

        if (records.count() != 0) {
            Map<TopicPartition, OffsetAndMetadata> metadata = consumer.committed(partitions);

            // reading the committed offsets for each partition after polling
            for (TopicPartition tpartition : partitions) {
                OffsetAndMetadata offsetdata = metadata.get(tpartition);

                if (offsetdata != null && tpartition != null)
                    log.debug("committed offset is " + offsetdata.offset() + "  for topic partition "
                            + tpartition.partition());
            }
        }

        // waiting for all threads to complete after each poll
        try {
            waitForFuturesToEnd();
            allfutures.clear();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}`

Earlier i thought reason for it getting stuck is the size of the records being consumed , so i have reduced the MAX_POLL_RECORDS_CONFIG to 10. This will ensure the records fetched in the poll wont be more than 200kb since each record can have a max size of 20kb.

Thinking of using Spring framework to resolve this issue but before that would like to know why exactly the consumer gets stuck .Any insights on this will be helpful.

Divyang Shah
  • 3,577
  • 5
  • 14
  • 27
  • It might be related to different rates of producing and consuming data in Kafka. Check this https://stackoverflow.com/a/49674534/2096986 – Felipe Nov 09 '20 at 07:02

0 Answers0