I'm using the Spark ML GBTClassifier in pyspark
to train a binary classification model on a dataframe with ~400k rows and ~9k columns on an AWS EMR cluster. I'm comparing this against my current solution, which is running XGBoost on a huge EC2 that can fit the whole dataframe in memory.
My hope was that I could train (and score new observations) much faster in Spark because it would be distributed/parallel. However, when watch my cluster (through ganglia) I see that only 3-4 nodes have active CPU while the rest of the nodes are just sitting there. In fact, from the looks of it, it may be using only one node for the actual training.
I can't seem to find anything in the documentation about a node limit or partitions or anything that seems relevant to why this is happening. Maybe I'm just misunderstanding the implementation of the algorithm, but I assumed that it was implemented such a way that training could be parallelized to take advantage of the EMR/cluster aspect of Spark. If not, is there any advantage to doing it this way vs. just doing it in memory on a single EC2? I guess you don't have to load the data into memory, but that's not really much of an advantage.
Here is some boilerplate of my code. Thanks for any ideas!
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Start Spark context:
sc = pyspark.SparkContext()
sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate()
# load data
df = sqlContext.sql('SELECT label, features FROM full_table WHERE train = 1')
df.cache()
print("training data loaded: {} rows".format(df.count()))
test_df = sqlContext.sql('SELECT label, features FROM full_table WHERE train = 0')
test_df.cache()
print("test data loaded: {} rows".format(test_df.count()))
#Create evaluator
evaluator = BinaryClassificationEvaluator()
evaluator.setRawPredictionCol('prob')
evaluator.setLabelCol('label')
# train model
gbt = GBTClassifier(maxIter=100,
maxDepth=3,
stepSize=0.1,
labelCol="label",
seed=42)
model = gbt.fit(df)
# get predictions
gbt_preds = model.transform(test_df)
gbt_preds.show(10)
# evaluate predictions
getprob=udf(lambda v:float(v[1]),DoubleType())
preds = gbt_preds.withColumn('prob', getprob('probability'))\
.drop('features', 'rawPrediction', 'probability', 'prediction')
preds.show(10)
auc = evaluator.evaluate(preds)
auc
Sidenote: the tables I am using are already vectorized. The model runs with this code, it just runs slow (~10-15 mins to train) and only uses 3-4 (or maybe only one of the) cores.