I have a train
and test
dataset with features, and several thousand customerId
values.
My goal is to concurrently train one binary xgboost
classifier per customerId
in Spark.
I'm essentially trying to do what the poster in this thread is asking about, but in Scala instead of PySpark. I understand from reading around that the answer is given in the following document, but I'm unsure how to implement it. The Job Scheduling documentation for Spark tells me that I need to initiate the training of each model from separate threads.
So far my code looks something like this:
// Data
val train: DataFrame = ...
val test: DataFrame = ...
// Model
val xgbClassifier: XGBoostClassifier = ...
// List of unique customerId's
val customers: List[Int] = ...
// Function for training and predicting for a given customer
def trainAndPredict(customer: Int): DataFrame = {
val train_subset = train.filter($"customerId" === customer)
val test_subset = test.filter($"customerId" === customer)
...
}
// Recursively train and predict for all customers
@tailrec
final def recTrainAndPredict(customers: List[Int], acc: DataFrame): DataFrame = customers match {
case Nil => acc
case x :: xs => recTrainAndPredict(xs, acc.union(trainAndPredict(x)))
}
val result = recTrainAndPredict(customers, spark.emptyDataFrame)
The code runs, but I'm guessing it's wasting a lot of time by spreading small datasets across different nodes.
How would I go about getting different calls of trainAndPredict
to be run concurrently without sacrificing time by distributing the job across different nodes?