we are using Kafka streams to insert into PostgreSQL since the flow is too high a direct insert is being avoided. The consumer seems to be working well but gets stuck occasionally and cant find the root cause for the same .
The consumer has been running for about 6 months and already consumed billions of records. I don't understand why its getting stuck as of late . I don't even know where to start debugging.
Below is the code for processing the records: `private static void readFromTopic(DataSource datasource, ConsumerOptions options) {
KafkaConsumer<String, String> consumer = KafkaConsumerConfig.createConsumerGroup(options);
Producer<Long, String> producer = KafkaProducerConfig.createKafkaProducer(options);
if (options.isReadFromAnOffset()) {
// if want to assign particular offsets to consume from
// will work for only a single partition for a consumer
List<TopicPartition> tpartition = new ArrayList<TopicPartition>();
tpartition.add(new TopicPartition(options.getTopicName(), options.getPartition()));
consumer.assign(tpartition);
consumer.seek(tpartition.get(0), options.getOffset());
} else {
// use auto assign partition & offsets
consumer.subscribe(Arrays.asList(options.getTopicName()));
log.debug("subscribed to topic {}", options.getTopicName());
}
List<Payload> payloads = new ArrayList<>();
while (true) {
// timer is the time to wait for messages to be received in the broker
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(50));
if(records.count() != 0 )
log.debug("poll size is {}", records.count());
Set<TopicPartition> partitions = records.partitions();
// reading normally as per round robin and the last committed offset
for (ConsumerRecord<String, String> r : records) {
log.debug(" Parition : {} Offset : {}", r.partition(), r.offset());
try {
JSONArray arr = new JSONArray(r.value());
for (Object o : arr) {
Payload p = JsonIterator.deserialize(((JSONObject) o).toString(), Payload.class);
payloads.add(p);
}
List<Payload> steplist = new ArrayList<>();
steplist.addAll(payloads);
// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Connection conn = datasource.getConnection();
PgInsert.insertIntoPg(steplist, conn, consumer, r, options.getTopicName(),
options.getErrorTopic(), producer);
} catch (Exception e) {
log.error("error in processing future {}", e);
}
}
}, executorService);
// used to combine all futures
allfutures.add(future);
payloads.clear();
} catch (Exception e) {
// pushing into new topic for records which have failed
log.debug("error in kafka consumer {}", e);
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(options.getErrorTopic(),
r.offset(), r.value());
producer.send(record);
}
}
// commiting after every poll
consumer.commitSync();
if (records.count() != 0) {
Map<TopicPartition, OffsetAndMetadata> metadata = consumer.committed(partitions);
// reading the committed offsets for each partition after polling
for (TopicPartition tpartition : partitions) {
OffsetAndMetadata offsetdata = metadata.get(tpartition);
if (offsetdata != null && tpartition != null)
log.debug("committed offset is " + offsetdata.offset() + " for topic partition "
+ tpartition.partition());
}
}
// waiting for all threads to complete after each poll
try {
waitForFuturesToEnd();
allfutures.clear();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}`
Earlier i thought reason for it getting stuck is the size of the records being consumed , so i have reduced the MAX_POLL_RECORDS_CONFIG
to 10
. This will ensure the records fetched in the poll wont be more than 200kb since each record can have a max size of 20kb.
Thinking of using Spring framework to resolve this issue but before that would like to know why exactly the consumer gets stuck .Any insights on this will be helpful.