3

I'm trying to build a monitoring system for our ST server. So far something like logging query, rows retrieved/red and time spent will be fine.

I have implemented a custom Listener, I am able to retrieve query and time without problem, listening for SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd

Something like this:

  //Structure to hold executionId, query itself from the description and startTime
  val queries = new HashMap[Long, (String, Long)]

def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    logger.info(s"----------executionId: ${event.executionId}")
    logger.info(s"----------description: ${event.description}")
    logger.info(s"----------startTime: ${event.time}")
    logger.info(s"----------metrics")

    queries.put(event.executionId, (event.description, event.time))
}

def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
  logger.info("-----onExecutionEnd")
  logger.info(s"---------executionId: ${event.executionId}")
  logger.info(s"---------endTime: ${event.time}")


  val executedPlanMetricsMap = queryExecution.executedPlan.metrics
  printMetrics(executedPlanMetricsMap, "executedPlan")
}

//Help method to print metrics map
def printMetrics[A] (metricsMap: Map[String, A], metricType:String): Unit ={
    try {
      logger.info(s"---------metrics from $metricType with size: ${metricsMap.size}")
      metricsMap.foreach { case (k, v) => logger.info(s"---------------metric from $metricType " +
        s"key: $k, value: $v")}
    } catch {
        case e: Exception => logger.info(s"---------empty $metricType")
      }
  }

I'm only printing them on the log so far, just to check if its possible to retrieve the values I need. This one example output

INFO EventMonitorCustomListener: ---------executionId: 16
INFO EventMonitorCustomListener: ---------endTime: 1630665171840
INFO EventMonitorCustomListener: ---------query: select *
    from <myDatabase>.<myTable>
INFO EventMonitorCustomListener: ---------metrics from executedPlan with size: 6
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numFiles, value: SQLMetric(id: 82, name: Some(number of files read), value: 1)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: pruningTime, value: SQLMetric(id: 85, name: Some(dynamic partition pruning time), value: -1)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: metadataTime, value: SQLMetric(id: 83, name: Some(metadata time), value: 121)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: filesSize, value: SQLMetric(id: 84, name: Some(size of files read), value: 36148)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numOutputRows, value: SQLMetric(id: 81, name: Some(number of output rows), value: 0)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numPartitions, value: SQLMetric(id: 86, name: Some(number of partitions read), value: 1)

As you can see, I got the query, the total time spent through the start and end times. But when I check metrics from the executedPlan values are not consistent. numOutputRows is 0 when it should be a number (21 in my example specifically) I also tried with queryExecution.executedPlan.collectLeaves().head.metrics as i found here with same result

First question: Is it possible to retrieve the number of output rows from the SparkListenerSQLExecutionEnd event?

If not, I'm able to retrieve them from the SparkListenerStageCompleted event from the accumulables stageCompleted.stageInfo.accumulables with key "internal.metrics.input.recordsRead". But in this case i'm not able to link the stage or the job to the SQLexecution. i.e. I got jobid=4 and stageId=5, executionId=12 but there is no value in any event to link one to another.

Second question: Do you know a way to know which stage or job an execution belongs to?

Kind Regards

EDIT:

I found a way to link executionId with JobId, through jobStart.executionId. It's working fine in a STS in docker, but not in my real STS. It may be something related to the STS configuration?

SCouto
  • 7,808
  • 5
  • 32
  • 49
  • How did you capture the query? I'm looking for something to capture the queries for the spark job but I don't see that part in the code you shared. – Anshul Apr 18 '22 at 07:21
  • To gather the query you should use SparkListenerSQLExecutionStart private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val query = event.description } – SCouto Apr 19 '22 at 11:13

1 Answers1

0

We finally managed to found why my jobStart came without executionId in the properties. It was because the incrementalCollect was set to true. Setting to false fix the issue.

SCouto
  • 7,808
  • 5
  • 32
  • 49