I have the file as following:
(1,object1)
(2,object2)
(3,object3)
...
the number is the ID of object. I want to calculate the distance between two objects from this file.That means I want to get the result as follow:
(1,1,0.0)
(1,2,1.2)
(1,3,1.3)
...
for each row,the first element is the id of one object,the second element is the id of the other object,the last element is the distance.So if there are 3 lines in the original file,it will generate a file with 9 lines. Now the problem is that I have an original file contains many lines,the size is very large so I turn to Spark using the cluster to do calculation. So I use the following code:(supposing original file is file1.txt)
val fileRdd = sc.textFile("hdfs://../file1.txt")
val fileRdd2 = sc.textFile("hdfs://../file1.txt")
val objects = fileRdd.collect()
val objectsArr = spark.sparkContext.broadcast(objects)
val distanceRes = fileRdd2.flatMap(x=>{
distanceCal(x,objectsArr.value)
}).saveAsTextFile(targetFile)
In this snippet, the function distanceCal receive a object and an Array of object,then return an Array of (objectID,objectID,distance). What I want to do here is firstly broadcast the objects to each executor Then do the double loop in the flatMap function. I use yarn to submit my job and the configuration is as following:
--master yarn --num-executors 6 --executor-memory 8G --executor-cores 8 --conf spark.default.parallelism=144
I use 6 executors to run this job. I suppose that the calculation will be done in many tasks.However, when it comes to the saveAsTextFile stage, the calculation is only running on two executors. In this case,it only run on dn01 and dn07,while other executors have no tasks.
So,what is problem?Thank you very much!
To be more specific,I have tried to use the code as following to increase the partition so that it will run on different executors simultaneously in the final stage(saveAsTextFile).However,it still use the only two executors while the other have no tasks. So how to change my code to achieve this? That is, in the final stage there will be more executor to be used in the calculation not only two of them in my example.
fileRdd2.repartition(1000).flatMap(...)