In Pyspark I've got a large dataset loaded which I'm running through my GBMClassifier. Prior to train/fitting, performing a groupby on the input data produces expected results (the values add up to the expected count, etc.). However, after fitting the test data, using a GroupBy on the predictions does not give reproduceable results. I'm trying to produce a basic Precision/Recall, so I'm trying to split into groups of Label and Prediction. The results that are output don't vary a huge amount but do move around and aren't reliable. I haven't used MultiClassMetrics because I want to explore different classification probability thresholds, however at this point would be open to it. I haven't been able to get my output DataFrame into a format that MultiClassMetrics accepts though.
I've tried GroupBy with Count() as well as Filtering on the specific sets of data to see if using two different approaches might ellicit different results (i.e. if the data in the column wasn't being matched by the filter)
It's worth mentioning that I'm working on AWS in EMR Notebooks, on a 4 node cluster.
train_df=splits[0]
test_df=splits[1]
gbm = GBTClassifier(stepSize=0.1, seed=2018)
model_gbm = gbm.fit(train_df)
prediction_gbm = model_gbm.transform(test_df)
#Split the probability column into two values to allow assessment of different classification thresholds
prediction_gbm = (prediction_gbm.withColumn("probability_split",to_array(col("probability"))) .withColumn('prob_norm',col("probability_split")0]).withColumn('prob_fraud',col("probability_split")[1]))
#Test a new threshold
newPrediction = when(col('prob_fraud')>0.5,1).otherwise(0)
prediction_gbm = prediction_gbm.withColumn('newPrediction',newPrediction)
#This section simply prints the results of my grouping. This is what is producing inconsistent results
gbm_FN=prediction_gbm.filter((F.col('label')==1) & (F.col('newPrediction')==0)).count()
gbm_FP=prediction_gbm.filter((F.col('label')==0) & (F.col('newPrediction')==1)).count()
gbm_TP=prediction_gbm.filter((F.col('label')==1) & (F.col('newPrediction')==1)).count()
gbm_TN=prediction_gbm.filter((F.col('label')==0) & (F.col('newPrediction')==0)).count()
#Here is the groupBy code as well for clarification
prediction_gbm.groupBy(['label','prediction']).count().show()
I would expect the values output for the 4 grouping of label and prediction to add up consistently. Additionally, I'd expect the results of the groupby to be the same as the 4 values produced, and to add up to the same value.
EDIT: When I train my model, I get this error on the first pass, but after that when I run it, I don't see this issue:
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/opt/conda/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/opt/conda/lib/python3.6/site-packages/awseditorssparkmonitoringwidget-1.0-py3.6.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 905