2

I want to track the global failure rates for jobs/tasks/stages across all nodes in the cluster. Currently the idea is to parse log files in HDFS written by the history server and obtain this data, but this seems clunky. Are there any better approaches? Ideally I would have access to this information per job submitted client side, but this doesn't seem to be the case. What is the recommended way to approach this?

1 Answers1

1

One idea is to extend SparkListener and gather metrics around failures to wherever you want (e.g. push events to ELK).

Some useful events:

case class SparkListenerExecutorBlacklisted(
    time: Long,
    executorId: String,
    taskFailures: Int)
  extends SparkListenerEvent

case class SparkListenerExecutorBlacklistedForStage(
    time: Long,
    executorId: String,
    taskFailures: Int,
    stageId: Int,
    stageAttemptId: Int)
  extends SparkListenerEvent

case class SparkListenerNodeBlacklistedForStage(
    time: Long,
    hostId: String,
    executorFailures: Int,
    stageId: Int,
    stageAttemptId: Int)
  extends SparkListenerEvent

case class SparkListenerNodeBlacklisted(
    time: Long,
    hostId: String,
    executorFailures: Int)
  extends SparkListenerEvent

And listeners:

def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted): Unit
def onExecutorBlacklistedForStage(executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage): Unit
def onNodeBlacklistedForStage(nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage): Unit
def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit

Note that you may subscribe the listener via Spark context's addSparkListener. More details in this other Stack Overflow thread: How to implement custom job listener/tracker in Spark?

Note: to make it work with PySpark, follow the steps described in this other Stack Overflow thread: How to add a SparkListener from pySpark in Python?

Fabio Manzano
  • 2,847
  • 1
  • 11
  • 23