6

I am trying to do the following:

+-----+-------------------------+----------+-------------------------------------------+
|label|features                 |prediction|probability                                |
+-----+-------------------------+----------+-------------------------------------------+
|0.0  |(3,[],[])                |0         |[0.9999999999999979,2.093996169658831E-15] |
|1.0  |(3,[0,1,2],[0.1,0.1,0.1])|0         |[0.999999999999999,9.891337521299582E-16]  |
|2.0  |(3,[0,1,2],[0.2,0.2,0.2])|0         |[0.9999999999999979,2.0939961696578572E-15]|
|3.0  |(3,[0,1,2],[9.0,9.0,9.0])|1         |[2.093996169659668E-15,0.9999999999999979] |
|4.0  |(3,[0,1,2],[9.1,9.1,9.1])|1         |[9.89133752128275E-16,0.999999999999999]   |
|5.0  |(3,[0,1,2],[9.2,9.2,9.2])|1         |[2.0939961696605603E-15,0.9999999999999979]|
+-----+-------------------------+----------+-------------------------------------------+

Convert the above dataframe to have two more columns: prob1 & prob2 Each column having the corresponding values as present in the probability column.

I found similar questions - one in PySpark and the other in Scala. I do not know how to translate the PySpark code and I am getting an error with the Scala code.

PySpark Code:

split1_udf = udf(lambda value: value[0].item(), FloatType())
split2_udf = udf(lambda value: value[1].item(), FloatType())

output2 = randomforestoutput.select(split1_udf('probability').alias('c1'), split2_udf('probability').alias('c2'))

Or to append these columns to the original dataframe:

randomforestoutput.withColumn('c1', split1_udf('probability')).withColumn('c2', split2_udf('probability'))

Scala Code:

import org.apache.spark.sql.functions.udf

val getPOne = udf((v: org.apache.spark.mllib.linalg.Vector) => v(1))
model.transform(testDf).select(getPOne($"probability"))

I get the following error when I run the Scala code:

scala> predictions.select(getPOne(col("probability"))).show(false)
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(probability)' due to data type mismatch: argument 1 requires vector type, however, '`probability`' is of vector type.;;
'Project [UDF(probability#39) AS UDF(probability)#135]
+- Project [label#0, features#1, prediction#34, UDF(features#1) AS probability#39]
   +- Project [label#0, features#1, UDF(features#1) AS prediction#34]
      +- Relation[label#0,features#1] libsvm

I am currently using Scala 2.11.11 and Spark 2.1.1

zero323
  • 322,348
  • 103
  • 959
  • 935
modin3956
  • 149
  • 2
  • 11

1 Answers1

7

What I understand from your question is that you are trying to split probability column into two columns prob1 and prob2. If thats the case then a simple array functionality with withColumn should solve your issue.

predictions
  .withColumn("prob1", $"probability"(0))
  .withColumn("prob2", $"probability"(1))
  .drop("probability")

You can find more functions that can help you in the future to be applied to dataframes.

Edited

I created a temp dataframe to match with your column as

val predictions = Seq(Array(1.0,2.0), Array(2.0939961696605603E-15,0.9999999999999979), Array(Double.NaN,Double.NaN)).toDF("probability")
+--------------------------------------------+
|probability                                 |
+--------------------------------------------+
|[1.0, 2.0]                                  |
|[2.0939961696605603E-15, 0.9999999999999979]|
|[NaN, NaN]                                  |
+--------------------------------------------+

And I applied the above withColumns which resulted

+----------------------+------------------+
|prob1                 |prob2             |
+----------------------+------------------+
|1.0                   |2.0               |
|2.0939961696605603E-15|0.9999999999999979|
|NaN                   |NaN               |
+----------------------+------------------+

Schema mismatch Edit

Now that since Vector schema of your probability column doesn't match with above solution of arrayType schema, above solution shall not work in your condition. Please use the following solution.

You will have to create udf functions and return the value as expected as

   val first = udf((v: Vector) => v.toArray(0))
    val second = udf((v: Vector) => v.toArray(1))
    predictions
      .withColumn("prob1", first($"probability"))
      .withColumn("prob2", second($"probability"))
      .drop("probability")

I hope you get the desired result.

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • The correct `Vector` data type to use is `org.apache.spark.ml.linalg.Vector` Thanks a lot for your help and patience! Really appreciate it! – modin3956 Jun 16 '17 at 18:11