I am building monitoring for my spark structured streaming application and need to get consumer lag of a certain topic consumed by the spark application. I believe the spark driver must be aware of this lag as it has all the metadata of the executors. I don't see any way to get this metrics from any existing spark docs or resources. I checked out streaminQueryListener
interface but it also has limited capability as we can get only per query metrics from it.

- 355
- 5
- 14
2 Answers
The challenge with tracking the consumer lag of a Structured Streaming job is that Structured Streaming does not commit any offsets back to Kafka (see here for more details). Therefore, Kafka is not aware of the actual progress of the Structured Streaming job.
On the other hand, Spark has no insight into the amount of messages/offset that are currently located in the Kafka topic.
In order to monitor the consumer lag, you need to bring those informations together:
- Continuously requesting latest offsets within a TopicPartition
- Continuously checking the current offset processed by Structured Streaming application
You could for example create a Kafka AdminClient
and get the required information from Kafka during the onQueryProgress
call of a StreamingQueryListener
. In that method you then need to compare the mentioned offsets of the latest event with the actually highest offset available in Kafka.

- 16,250
- 3
- 42
- 77
here is a approach to get requested info on executor nodes. Information is fetched for every message, you may implement reducing of request amount the way most fit to your needs (count, time, etc).
Below I send monitoring info to another Kafka topic.
I open Kafka consumer connection (to get information about max offset) on each streaming batch of messages, quite frequently. Maybe it is unacceptable to you.
final JavaInputDStream<ConsumerRecord<String, byte[]>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams));
JavaPairDStream<String, Income> streamPair = stream
.mapPartitionsToPair(new PairFlatMapFunction<Iterator<ConsumerRecord<String, byte[]>>, String, Income>() {
private Map<String, Object> getProps() {
Map<String, Object> kafkaParams2 = new HashMap<>();
kafkaParams2.put("bootstrap.servers", ApiConsts.BOOTSTRAP_SERVERS);
kafkaParams2.put("key.deserializer", StringDeserializer.class);
kafkaParams2.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams2.put("group.id", "ta_calc_spark" + UUID.randomUUID().toString());
kafkaParams2.put("auto.offset.reset", "latest");
kafkaParams2.put("enable.auto.commit", false);
kafkaParams2.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
kafkaParams2.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
return kafkaParams2;
}
@Override
public Iterator<Tuple2<String, Income>> call(Iterator<ConsumerRecord<String, byte[]>> t) throws Exception {
KafkaConsumer consumer = new KafkaConsumer<>(getProps());
ArrayList<TopicPartition> partitions0 = new ArrayList<TopicPartition>();
IntStream.range(0, consumer.partitionsFor(ApiConsts.TOPIC_TA_CALC_SPARK_TASK).size())
.forEach(i -> partitions0.add(new TopicPartition(ApiConsts.TOPIC_TA_CALC_SPARK_TASK, i)));
consumer.assign(partitions0);
KafkaProducer producerMonitoring = getKafkaProducer();
List<Tuple2<String, Income>> result = new ArrayList<Tuple2<String, Income>>();
try {
t.forEachRemaining(t2 -> {
// business logic - message handling
try {
Set<TopicPartition> partitions = new HashSet<TopicPartition>();
TopicPartition actualTopicPartition = new TopicPartition(ApiConsts.TOPIC_TA_CALC_SPARK_TASK, t2.partition());
partitions.add(actualTopicPartition);
Long actualEndOffset = (Long) consumer.endOffsets(partitions).get(actualTopicPartition);
long actualPosition = consumer.position(actualTopicPartition);
String monitorValue = String.format(
"diff: %s (partition:%s; actualEndOffsetStreaming:%s; actualEndOffset:%s; actualPosition=%s)",
actualEndOffset - actualPosition, t2.partition(), t2.offset(), actualEndOffset, actualPosition);
ProducerRecord<String, String> pRecord = new ProducerRecord<String, String>(ApiConsts.TOPIC_TA_CALC_SPARK_TEMP_RESULT,
UUID.randomUUID().toString(), monitorValue);
producerMonitoring.send(pRecord);
} catch (Exception ex) {
log.error("################# mapPartitionsToPair.call() ERROR", ex);
ex.printStackTrace();
}
});
} finally {
producerMonitoring.close();
consumer.close();
}
return result.iterator();
}
});
Output:
Consumer Record:(f45cd24b-6232-45b2-b8f2-814753ae89bf, diff: 0 (partition:4; actualEndOffsetStreaming:1177; actualEndOffset:1178; actualPosition=1178), 2, 109)
Consumer Record:(3ec4f576-1fff-4c91-885f-fc709f7f4531, diff: 0 (partition:4; actualEndOffsetStreaming:1176; actualEndOffset:1178; actualPosition=1178), 3, 105)

- 363
- 3
- 8