16

I'm trying to parallelize a machine learning prediction task via Spark. I've used Spark successfully a number of times before on other tasks and have faced no issues with parallelization before.

In this particular task, my cluster has 4 workers. I'm calling mapPartitions on an RDD with 4 partitions. The map function loads a model from disk (a bootstrap script distributes all that is needed to do this; I've verified it exists on each slave machine) and performs prediction on data points in the RDD partition.

The code runs, but only utilizes one executor. The logs for the other executors say "Shutdown hook called". On different runs of the code, it uses different machines, but only one at a time.

How can I get Spark to use multiple machines at once?

I'm using PySpark on Amazon EMR via Zeppelin notebook. Code snippets are below.

%spark.pyspark

sc.addPyFile("/home/hadoop/MyClassifier.py")
sc.addPyFile("/home/hadoop/ModelLoader.py")

from ModelLoader import ModelLoader
from MyClassifier import MyClassifier

def load_models():
    models_path = '/home/hadoop/models'
    model_loader = ModelLoader(models_path)

    models = model_loader.load_models()
    return models

def process_file(file_contents, models):
    filename = file_contents[0]
    filetext = file_contents[1]
    pred = MyClassifier.predict(filetext, models)
    return (filename, pred)

def process_partition(file_list):
    models = load_models()
    for file_contents in file_list:
        pred = process_file(file_contents, models)
        yield pred


all_contents = sc.wholeTextFiles("s3://some-path", 4)
processed_pages = all_contents.mapPartitions(process_partition)
processedDF = processed_pages.toDF(["filename", "pred"])
processedDF.write.json("s3://some-other-path", mode='overwrite')

There are four tasks as expected, but they all run on the same executor!

I have the cluster running and can provide logs as available in Resource Manager. I just don't know yet where to look.

Shaido
  • 27,497
  • 23
  • 70
  • 73
Ansari
  • 8,168
  • 2
  • 23
  • 34
  • Did you properly setup Zeppelin to work on yarn-cluster mode? AFAR, Zeppelin in EMR starts in local mode. – Zouzias Oct 22 '17 at 20:17
  • @Zouzias I've never had to do anything special to get it to use multiple workers properly before. I think the mode is correct. The "master" config value is set to "yarn-client". – Ansari Oct 22 '17 at 20:50
  • Is`"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"` specified in `capacity-scheduler` as it needs to be? Otherwise YARN will _not_ optimize cluster usage based on your job. – Glennie Helles Sindholt Oct 24 '17 at 08:24
  • can you please this line, all_contents.getNumPartitions and see how many partitions are available – loneStar Oct 26 '17 at 13:05
  • @Achyuth The number of partitions is 4 (or whatever number I pass in to wholeTextFiles) – Ansari Oct 28 '17 at 15:15
  • @GlennieHellesSindholt Can you point me to where I can view the capacity scheduler properties? They don't seem to show in the Resource Manager Environment page – Ansari Oct 28 '17 at 15:18
  • @Ansari, you can find the capacity schelduer in this particular location. vi /etc/hadoop/conf/capacity-scheduler.xml. But to my knowledge its more about the number of cores utilization – loneStar Oct 29 '17 at 17:07
  • @Ansari can you please try to partition the output with respect to map and rewrite the last command processedDF.write.json.partitionBy(logic)("s3://some-other-path", mode='overwrite'). I am thinking all the stages are transformations and last stage is action, which is writing only one file. because of which it is shutting down exectors – loneStar Oct 29 '17 at 18:20
  • @Achyuth Surprisingly, there are as many output files as there are partitions! The tasks run serially on one executor. – Ansari Oct 29 '17 at 19:03
  • Can you paste the spark-ui dag for me. To debug a little bit and also please set the property and restart the yarn, how to do that is sudo vi file, change the property and then do the following initctl list, sudo hadoop-yarn-resourcemanager stop , sudo hadoop-yarn-resourcemanager start. Please let me know the executor memory and executor cores. With out this particular property, it uses one core, so tasks might be serialized – loneStar Oct 29 '17 at 19:12
  • @Achyuth I'm in the process of doing all that. Do you want to gather your thoughts and paste them in an answer below? That way if this line of investigation bears fruit I can assign it the accepted answer – Ansari Oct 29 '17 at 19:54

2 Answers2

4

Two points to mention here (not sure if they will solve your issue though):

  1. wholeTextFiles uses WholeTextFileInputFormat which extends CombineFileInputFormat, and because of CombineFileInputFormat, it will try to combine groups of small files into one partition. So if you set the number of partition to 2 for example, you 'might' get two partitions but it is not guaranteed, it depends on the size of the files you are reading.
  2. The output of wholeTextFiles is an RDD that contains an entire file in each record (and each record/file cannot be split so it will end by being in a single partition/worker). So if you are reading one file only, you will end by having the full file in one partition despite that you set the partitioning to 4 in your example.
Rami
  • 8,044
  • 18
  • 66
  • 108
  • Thanks for the answer. I'm reading about 2000 files and there do seem to be 4 partitions (at least according to getNumPartitions) – Ansari Oct 28 '17 at 15:16
3

The process has as many as partitions you specified but it is going in serialized way.

Executors

The process might spin up default number of executors. This can be seen in the yarn resource manager. In your case all the processing is done by one executor. If executor has more than one core it will parellize the job. In emr you have do this changes in order to have more than 1 core for the executor.

What specifically happening in our case is, the data is small, so all the data is read in one executor(ie which is using one node). With out the following property the executor uses only single core. Hence all the tasks are serialized.

Setting the property

sudo  vi /etc/hadoop/conf/capacity-scheduler.xml

Setting the following property as shown

"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalcul‌​ator"

In order to make this property applicable you have to restart the yarn

 sudo  hadoop-yarn-resourcemanager stop

Restart the yarn

 sudo  hadoop-yarn-resourcemanager start 

When your job is submitted see the yarn and the spark-ui

In Yarn you will see more cores for executor

loneStar
  • 3,780
  • 23
  • 40