As continuation of this question, could you please tell me what properties I can change from SparkContext.setLocalProperties
? Could I change cores, RAM etc?
-
I think this part should give you available properties: http://spark.apache.org/docs/latest/configuration.html#available-properties – Adonis Mar 22 '17 at 14:02
-
@asettouf does that mean I can run multiple jobs with different properties simultaneously on Standalone cluster manager? – VB_ Mar 22 '17 at 14:04
-
From my understanding, it sets properties on your running thread, so theoretically it could be possible to spawn a new thread, use the same SparkContext, and set different local properties (watch out not to try and create several SparkContext), as the doc seems to point to (I did not test so I cannot affirm it will work) http://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/SparkContext.html#setLocalProperty(java.lang.String,%20java.lang.String) – Adonis Mar 22 '17 at 14:10
-
@asettouf agree, I need to write a proof of concept – VB_ Mar 22 '17 at 14:11
3 Answers
As per documentation description localProperties
is a protected[spark]
property of a SparkContext
that are the properties through which you can create logical job groups. In other hand they are Inheritable
thread-local variables. Which means that they are used in preference to ordinary thread-local variables when the per-thread-attribute being maintained in the variable must be automatically transmitted to any child threads that are created.Propagating local properties to workers starts when SparkContext
is requested to run or submit a Spark job that in turn passes them along to DAGScheduler
.
And in general Local properties
is used to group jobs into pools in FAIR job scheduler by spark.scheduler.pool
per-thread property and in method SQLExecution.withNewExecutionId
to set spark.sql.execution.id
.
I have no such experience assigning thread-local properties in standalone spark cluster. Worth to try and check it.

- 2,271
- 1
- 13
- 22
I made some testing with the property spark.executor.memory
(the available properties are here), , and actually on a very simple local Spark, starting two threads each with different settings seem to be confined to the threads, with the code (probably not a code you would deploy into production) at the end of this post, making some interleaving of threads to be sure it's not through some sheer scheduling luck, I obtain the following output (cleaning spark output to my console):
Thread 1 Before sleeping mem: 512
Thread 2 Before sleeping mem: 1024
Thread 1 After sleeping mem: 512
Thread 2 After sleeping mem: 1024
Pretty neat to observe a declared property in a thread stays inside the said thread, although I am pretty sure that it can easily lead to nonsensical situation, so I'd still recommend caution before applying such techniques.
public class App {
private static JavaSparkContext sc;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local")
.setAppName("Testing App");
sc = new JavaSparkContext(conf);
SparkThread Thread1 = new SparkThread(1);
SparkThread Thread2 = new SparkThread(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
Future ThreadCompletion1 = executor.submit(Thread1);
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Future ThreadCompletion2 = executor.submit(Thread2);
try {
ThreadCompletion1.get();
ThreadCompletion2.get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static class SparkThread implements Runnable{
private int i = 1;
public SparkThread(int i) {
this.i = i;
}
@Override
public void run() {
int mem = 512;
sc.setLocalProperty("spark.executor.memory", Integer.toString(mem * i));
JavaRDD<String> input = sc.textFile("test" + i);
FlatMapFunction<String, String> tt = s -> Arrays.asList(s.split(" "))
.iterator();
JavaRDD<String> words = input.flatMap(tt);
System.out.println("Thread " + i + " Before sleeping mem: " + sc.getLocalProperty("spark.executor.memory"));
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//do some work
JavaPairRDD<String, Integer> counts = words.mapToPair(t -> new Tuple2(t, 1))
.reduceByKey((x, y) -> (int) x + (int) y);
counts.saveAsTextFile("output" + i);
System.out.println("Thread " + i + " After sleeping mem: " + sc.getLocalProperty("spark.executor.memory"));
}
}
}

- 4,670
- 3
- 37
- 57
LocalProperties provide an easy mechanism to pass (user defined) configurations from the driver to the executors. You can use the TaskContext on the executor to access them. An example of this is the SQL Execution ID

- 191
- 1
- 3
-
pls more details. Could u please get me an example of how to change cores/RAM within the same session for different jobs? – VB_ Mar 27 '17 at 12:55
-
@VolodymyrBakhmatiuk Not sure I understand your question, as given the available Spark properties, you can modify the driver cores and memory, as well as the executor memory per job, as long as you encapsulate them correctly in different threads. Am I missing something here? – Adonis Mar 28 '17 at 11:32
-
-
Local properties are not meant to set cores/RAM; it is meant to pass on properties local to Driver thread to the Executors. You need to configure the cores/memory a Spark program uses before starting the SparkContext. – Herman van Hovell Mar 28 '17 at 13:53