4

I'm using the Spark ML GBTClassifier in pyspark to train a binary classification model on a dataframe with ~400k rows and ~9k columns on an AWS EMR cluster. I'm comparing this against my current solution, which is running XGBoost on a huge EC2 that can fit the whole dataframe in memory.

My hope was that I could train (and score new observations) much faster in Spark because it would be distributed/parallel. However, when watch my cluster (through ganglia) I see that only 3-4 nodes have active CPU while the rest of the nodes are just sitting there. In fact, from the looks of it, it may be using only one node for the actual training.

I can't seem to find anything in the documentation about a node limit or partitions or anything that seems relevant to why this is happening. Maybe I'm just misunderstanding the implementation of the algorithm, but I assumed that it was implemented such a way that training could be parallelized to take advantage of the EMR/cluster aspect of Spark. If not, is there any advantage to doing it this way vs. just doing it in memory on a single EC2? I guess you don't have to load the data into memory, but that's not really much of an advantage.

Here is some boilerplate of my code. Thanks for any ideas!

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Start Spark context:
sc = pyspark.SparkContext()
sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate()

# load data
df = sqlContext.sql('SELECT label, features FROM full_table WHERE train = 1')
df.cache()
print("training data loaded: {} rows".format(df.count()))

test_df = sqlContext.sql('SELECT label, features FROM full_table WHERE train = 0')
test_df.cache()
print("test data loaded: {} rows".format(test_df.count()))


#Create evaluator
evaluator = BinaryClassificationEvaluator()
evaluator.setRawPredictionCol('prob')
evaluator.setLabelCol('label')

# train model
gbt = GBTClassifier(maxIter=100, 
                    maxDepth=3, 
                    stepSize=0.1,
                    labelCol="label", 
                    seed=42)

model = gbt.fit(df)


# get predictions
gbt_preds = model.transform(test_df)
gbt_preds.show(10)


# evaluate predictions
getprob=udf(lambda v:float(v[1]),DoubleType())
preds = gbt_preds.withColumn('prob', getprob('probability'))\
        .drop('features', 'rawPrediction', 'probability', 'prediction')
preds.show(10)

auc = evaluator.evaluate(preds)
auc

Sidenote: the tables I am using are already vectorized. The model runs with this code, it just runs slow (~10-15 mins to train) and only uses 3-4 (or maybe only one of the) cores.

seth127
  • 2,594
  • 5
  • 30
  • 43
  • So you're using EMR, right? Are you using `spark-submit` to submit the job? If so, can you post the entire command you use to submit the job? – xan Mar 08 '18 at 19:42
  • yes, I just use `spark-submit spark_gbt.py` (where `spark_gbt.py` is the name of the script above) from the command line ssh'd into the master node. Are you wondering about any specific settings on the cluster? I didn't set those up myself, but I could certainly track them down if you think that's relevant. I will say that other pyspark jobs I run on the same cluster use all the nodes. – seth127 Mar 08 '18 at 20:44
  • I'm trying to figure out if you're using the local mode or actually using the distributed mode. As far as I know, if you're not specifying `--master yarn`, then it will run in local mode on EMR. I also noticed another thing - you haven't cached any of your data in the code. You should run `df.cache()` and maybe even `df.count()` to initialize the cache before running the algorithm. – xan Mar 08 '18 at 23:27
  • It's running distributed mode. I ran with `--master yarn` to double-check and the behavior was the same. I also added the `df.cache()` and `df.count()` lines and nothing changed. (see edits above for placement.) – seth127 Mar 12 '18 at 17:20
  • I guess, to be more clear, my question is: do other people see this same behavior when they train models in Spark ML? – seth127 Mar 12 '18 at 17:38

1 Answers1

4

Thanks for the clarifying comments above.

It's not necessary that Spark's implementation is faster than XGBoost. In fact, I would expect what you're seeing.

The biggest factor is that XGBoost was designed and written specifically with Gradient Boosted Trees in mind. Spark, on the other hand is way more general purpose and most likely doesn't have the same kind of optimizations that XGBoost has. See here for a difference between XGBoost and scikit-learn's implementation of the classifier algorithm. If you want to really get into the details, you can read the paper and even the code behind XGBoost and Spark's implementations.

Remember, XGBoost is also parallel/distributed. It just uses multiple threads on the same machine. Spark helps you run the algorithm when the data doesn't fit on a single machine.

A couple other minor points I can think of are a) Spark does have a non-trivial startup time. Communication across different machines can also add up. b) XGBoost is written in C++ which is in general great for numerical computation.

As for why only 3-4 cores are being used by Spark, that depends on what your dataset size is, how it is being distributed across nodes, what is the number of executors that spark is launching, which stage is taking up most time, memory configuration, etc. You can use Spark UI to try and figure out what's going on. It's hard to say why that's happening for your dataset without looking at it.

Hope that helps.

Edit: I just found this great answer comparing execution times between a simple Spark application vs a standalone java application - https://stackoverflow.com/a/49241051/5509005. Same principles apply here as well, in fact much more so since XGBoost is highly optimized.

xan
  • 409
  • 2
  • 7
  • This is very helpful. Thank you. I vaguely had those trade-offs in mind, but it is good to have confirmation. My question is still: do other people see this same thing? I also see it when using the `LogisticRegression` classifier. Like you said, XGBoost is parallelized across cores, so you would think Spark would at least try to parallelize across nodes to compete. I tried this again with 14 million rows and it took over an hour but still only used one node (*not* the master node) for training. – seth127 Mar 13 '18 at 14:18
  • When training ML models, the only time I had to inspect Spark UI was running a Gradient Boosted Trees model - the same as you were looking at in the original post. I remember a number of nodes working - it was definitely more than 1 and less than the total number of nodes in the cluster. It didn't bother me because the cluster was pretty big -- 25-30 nodes. – xan Mar 13 '18 at 17:58
  • Update: I still can't get it to use more than 1 node to train, but if I multithread a few different `spark-submit` commands then it will send one model training job to each node. This is only helpful because I'm actually trying to train ~20 of these models each time (sort of a mini grid search). I'm still curious whether anyone knows if the Spark ML training is *supposed to be* distributed but as far as I can tell it is not. Thanks for all the help. – seth127 Mar 15 '18 at 13:26