I'm trying to build a monitoring system for our ST server. So far something like logging query, rows retrieved/red and time spent will be fine.
I have implemented a custom Listener, I am able to retrieve query and time without problem, listening for SparkListenerSQLExecutionStart
and SparkListenerSQLExecutionEnd
Something like this:
//Structure to hold executionId, query itself from the description and startTime
val queries = new HashMap[Long, (String, Long)]
def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
logger.info(s"----------executionId: ${event.executionId}")
logger.info(s"----------description: ${event.description}")
logger.info(s"----------startTime: ${event.time}")
logger.info(s"----------metrics")
queries.put(event.executionId, (event.description, event.time))
}
def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
logger.info("-----onExecutionEnd")
logger.info(s"---------executionId: ${event.executionId}")
logger.info(s"---------endTime: ${event.time}")
val executedPlanMetricsMap = queryExecution.executedPlan.metrics
printMetrics(executedPlanMetricsMap, "executedPlan")
}
//Help method to print metrics map
def printMetrics[A] (metricsMap: Map[String, A], metricType:String): Unit ={
try {
logger.info(s"---------metrics from $metricType with size: ${metricsMap.size}")
metricsMap.foreach { case (k, v) => logger.info(s"---------------metric from $metricType " +
s"key: $k, value: $v")}
} catch {
case e: Exception => logger.info(s"---------empty $metricType")
}
}
I'm only printing them on the log so far, just to check if its possible to retrieve the values I need. This one example output
INFO EventMonitorCustomListener: ---------executionId: 16
INFO EventMonitorCustomListener: ---------endTime: 1630665171840
INFO EventMonitorCustomListener: ---------query: select *
from <myDatabase>.<myTable>
INFO EventMonitorCustomListener: ---------metrics from executedPlan with size: 6
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numFiles, value: SQLMetric(id: 82, name: Some(number of files read), value: 1)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: pruningTime, value: SQLMetric(id: 85, name: Some(dynamic partition pruning time), value: -1)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: metadataTime, value: SQLMetric(id: 83, name: Some(metadata time), value: 121)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: filesSize, value: SQLMetric(id: 84, name: Some(size of files read), value: 36148)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numOutputRows, value: SQLMetric(id: 81, name: Some(number of output rows), value: 0)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numPartitions, value: SQLMetric(id: 86, name: Some(number of partitions read), value: 1)
As you can see, I got the query, the total time spent through the start and end times. But when I check metrics from the executedPlan values are not consistent. numOutputRows is 0 when it should be a number (21 in my example specifically) I also tried with queryExecution.executedPlan.collectLeaves().head.metrics
as i found here with same result
First question: Is it possible to retrieve the number of output rows from the SparkListenerSQLExecutionEnd
event?
If not, I'm able to retrieve them from the SparkListenerStageCompleted
event from the accumulables stageCompleted.stageInfo.accumulables
with key "internal.metrics.input.recordsRead". But in this case i'm not able to link the stage or the job to the SQLexecution. i.e. I got jobid=4 and stageId=5, executionId=12 but there is no value in any event to link one to another.
Second question: Do you know a way to know which stage or job an execution belongs to?
Kind Regards
EDIT:
I found a way to link executionId with JobId, through jobStart.executionId. It's working fine in a STS in docker, but not in my real STS. It may be something related to the STS configuration?