What's the meaning of the title "Locality Level" and the 5 status Data local --> process local --> node local --> rack local --> Any?
2 Answers
The locality level as far as I know indicates which type of access to data has been performed. When a node finishes all its work and its CPU become idle, Spark may decide to start other pending task that require obtaining data from other places. So ideally, all your tasks should be process local as it is associated with lower data access latency.
You can configure the wait time before moving to other locality levels using:
spark.locality.wait
More information about the parameters can be found in the Spark Configuration docs
With respect to the different levels PROCESS_LOCAL
, NODE_LOCAL
, RACK_LOCAL
, or ANY
I think the methods findTask
and findSpeculativeTask
in org.apache.spark.scheduler.TaskSetManager
illustrate how Spark chooses tasks based on their locality level.
It first will check for PROCESS_LOCAL
tasks which are going to be launched in the same executor process. If not, it will check for NODE_LOCAL
tasks that may be in other executors in the same node or it need to be retrieved from systems like HDFS, cached, etc. RACK_LOCAL
means that data is in another node and therefore it need to be transferred prior execution. And finally, ANY
is just to take any pending task that may run in the current node.
/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality)
}

- 5,031
- 17
- 33
- 41

- 1,782
- 16
- 18
-
Could you explain what you mean by "pending tasks"? I would think that a worker node's sole job is to run the tasks provided by the task scheduler. Once it is done running these tasks (maybe when the spark application is done running) then it stays idle. What are pending tasks then? – user3376961 Aug 25 '15 at 14:56
-
@user3376961 I think that the following question may clarify what a task is in spark. Bear in mind that you can also work with some level of elasticity and that also shows the importance of not having a one-to-one relationship. http://stackoverflow.com/q/25276409/91042 – Daniel H. Sep 04 '15 at 08:58
Here are my two cents and I summarized mostly from spark official guide.
Firstly, I want to add one more locality level which is NO_PREF
which has been discussed at this thread.
Then, let's put those levels together into a single table,
It's noted that specific level can be skipped as per guide from spark configuration.
For instance, if you want to skip NODE_LOCAL
, just set spark.locality.wait.node
to 0.

- 10,627
- 5
- 49
- 67