3

In my spark DataFrame I have a column which includes the output of a CountVectoriser transformation - it is in sparse vector format. What I am trying to do is to 'explode' this column again into a dense vector and then it's component rows (so that it can be used for scoring by an external model).

I know there are 40 features in the column, hence Following this example, I have tried:

import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector

// convert sparse vector to a dense vector, and then to array<double> 
val vecToSeq = udf((v: Vector) => v.toArray)

// Prepare a list of columns to create
val exprs = (0 until 39).map(i => $"_tmp".getItem(i).alias(s"exploded_col$i"))
testDF.select(vecToSeq($"features").alias("_tmp")).select(exprs:_*)

However, I get the weird error (see full error below):

data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;

Now it appears that maybe the CountVectoriser created a vector of type 'ml.linalg.Vector,' so I have alternatively tried importing:

import org.apache.spark.ml.linalg.{Vector, DenseVector, SparseVector}

And then I get an error Caused by:

Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row

I have also tried converting the ml vector by altering the UDF to:

val vecToSeq = udf((v: Vector) =>  org.apache.spark.mllib.linalg.Vectors.fromML(v.toDense).toArray )

And get a similar cannot be cast to org.apache.spark.sql.Row error. Can anyone tell me why this is not working? Is there an easier way to explode a sparse vector in a DataFrame into sperate columns? I've spent hours on this and cannot figure it out.

EDIT: The schema shows the feature column just as a vector:

  |-- features: vector (nullable = true)

Full error trace:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features)' due to data type mismatch: argument 1 requires vector type, however, 'features' is of vector type.;;
Project [UDF(features#325) AS _tmp#463]
. . . 
org.apache.spark.sql.cassandra.CassandraSourceRelation@47eae91d

        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
        at uk.nominet.renewals.prediction_test$.prediction_test(prediction_test.scala:292)
        at 
Shaido
  • 27,497
  • 23
  • 70
  • 73
renegademonkey
  • 457
  • 1
  • 7
  • 18

2 Answers2

4

When working such cases, I often decompose step by step to know where the issue is coming from.

First, let's setup a dataframe :

import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.linalg.Vector
val df=sc.parallelize(Seq((1L, Seq("word1", "word2")))).toDF("id", "words")
val countModel = new CountVectorizer().setInputCol("words").setOutputCol("feature").fit(df)
val testDF = countModel.transform(df)
testDF.show

+---+--------------+-------------------+
| id|         words|            feature|
+---+--------------+-------------------+
|  1|[word1, word2]|(2,[0,1],[1.0,1.0])|
+---+--------------+-------------------+

Now, what I would like is to select, say the first column of feature, that is to say, extract the first coordinate of the feature vector.

That could be written : v(0). Now I want my dataframe to have a column that holds v(0) where v is the feature column's content. I may use a User Defined Function for that :

val firstColumnExtractor = udf((v: Vector) => v(0))

And I try to add this column to my testDF

testDF.withColumn("feature_0", firstColumnExtractor($"feature")).show
+---+--------------+-------------------+---------+                              
| id|         words|            feature|feature_0|
+---+--------------+-------------------+---------+
|  1|[word1, word2]|(2,[0,1],[1.0,1.0])|      1.0|
+---+--------------+-------------------+---------+

Note that I could just as well do it this way (this is just a matter of style, as far as I can tell):

testDF.select(firstColumnExtractor($"feature").as("feature_0")).show

This works, but that is a lot of work to repeat. Let's automate. First, I can generalize the extracting function to work at any index. Let's create a higher order function (a function that creates functions)

def columnExtractor(idx: Int) = udf((v: Vector) => v(idx))

Now, I can rewrite the previous example :

testDF.withColumn("feature_0", columnExtractor(0)($"feature")).show

OK, so now I could do it this way :

testDF.withColumn("feature_0", columnExtractor(0)($"feature"))
      .withColumn("feature_1", columnExtractor(1)($"feature"))

That works for 1, but what about 39 dimensions ? Well, let's automate some more. The above really is a foldLeft operation on each dimension :

(0 to 39).foldLeft(testDF)((df, idx) => df.withColumn("feature_"+idx, columnExtractor(idx)($"feature")))

Which is just another way of writing your function with multiple selects

val featureCols = (0 to 1).map(idx => columnExtractor(idx)($"feature").as("feature_"+idx))
testDF.select((col("*") +: featureCols):_*).show
+---+--------------+-------------------+---------+---------+
| id|         words|            feature|feature_0|feature_1|
+---+--------------+-------------------+---------+---------+
|  1|[word1, word2]|(2,[0,1],[1.0,1.0])|      1.0|      1.0|
+---+--------------+-------------------+---------+---------+

Now, for performance reasons, you might want to convert your base Vector to an array of coordinates (or a DenseVector). Feel free to do that. I feel like a DenseVector or an Array will probably be very close performance-wise, so I would write it this way :

// A function to densify the feature vector
val toDense = udf((v:Vector) => v.toDense)
// Replase testDF's feature column with its dense equivalent
val denseDF = testDF.withColumn("feature", toDense($"feature"))
// Work on denseDF as we did on testDF 
denseDF.select((col("*") +: featureCols):_*).show
GPI
  • 9,088
  • 2
  • 31
  • 38
  • That is a fantastic answer - it really clarifies the syntax. Now, I can follow your example using the sample DF and it appears it works. But when I use my DF, even when I run your very first UDF, I get error: `Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row` - the same type of error I've been getting all along. – renegademonkey Jan 30 '18 at 16:30
  • I've been getting the same error when trying lots of different methods - it's driving me 'round the bend as I don't understand how to deal with it! – renegademonkey Jan 30 '18 at 16:44
  • Could you print your dataframe's schema object (eg. `testDF.schema("features")`), and add it to your question for future reference ? I suspect that are different types of Vector object lying around, seeing you are using a Cassandra connector as well. – GPI Jan 30 '18 at 16:44
  • The schema is a plain `|-- features: vector (nullable = true)` – renegademonkey Jan 30 '18 at 20:08
  • And the `df.schema("features")` output of my DF is `StructField(feature,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true)`- which is EXACTLY the same as the as the output from the testDF. This makes it all the stranger that this code works on one and not the other. – renegademonkey Jan 31 '18 at 08:28
  • Thanks for this answer. I need to do something similar except explode the SparseVector (generated from IDF transformation) into tuples of (index, idf_value) which can then be processed by ALS. Hopefully you've given me enough to pull it off! – Evan Zamir Oct 16 '18 at 22:26
  • @EvanZamir I’m really not sure it « means » anything valid to feed the output of IDF to an ALS algorithm. Nonetheless I suggest you ask your own question with a sample input and output to have a better grasp of what you need. (I guess you need a tuple of (index, valueAtIndex) but I’m not even sure). – GPI Oct 17 '18 at 07:01
1

It appears to be an issue with your import statements. As you noticed, CountVectorizer will use the ml package vectors, therefore, all vector imports should also use this package. Make sure you do not have any imports using the older mllib. This include:

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.DenseVector

There are some methods only present in the mllib package, so in the case you actually need to use this type of vectors, you can rename them (since the name is the same as the ml vectors otherwise). For example:

import org.apache.spark.mllib.linalg.{Vector => mllibVector}

After fixing all imports, your code should run. Test:

val df = Seq((1L, Seq("word1", "word2", "word3")), (2L, Seq("word2", "word4"))).toDF("id", "words")
val countVec = new CountVectorizer().setInputCol("words").setOutputCol("features")
val testDF = countVec.fit(df).transform(df)

Will give a testing dataframe as follows:

+---+--------------------+--------------------+
| id|               words|            features|
+---+--------------------+--------------------+
|  1|[word1, word2, wo...|(4,[0,2,3],[1.0,1...|
|  2|      [word2, word4]| (4,[0,1],[1.0,1.0])|
+---+--------------------+--------------------+

Now to give each index it's own column:

val vecToSeq = udf((v: Vector) => v.toArray)

val exprs = (0 until 4).map(i => $"features".getItem(i).alias(s"exploded_col$i"))
val df2 = testDF.withColumn("features", vecToSeq($"features")).select(exprs:_*)

Resulting dataFfame:

+-------------+-------------+-------------+-------------+
|exploded_col0|exploded_col1|exploded_col2|exploded_col3|
+-------------+-------------+-------------+-------------+
|          1.0|          0.0|          1.0|          1.0|
|          1.0|          1.0|          0.0|          0.0|
+-------------+-------------+-------------+-------------+
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • Thanks for this. This SHOULD work, it's pretty much what I have been trying. When I run it on the testDF it works great, but on my real DF, which has an identical sparseVector column created by CountVectoriser, I still get errors (`Failed to execute user defined function(anonfun$1: (vector) => array)` . . . `Caused by: java.lang.ClassCastException: org.apache.spark.ml.linalg.SparseVector cannot be cast to org.apache.spark.sql.Row`). The `df.struct("features")` of both DFs are identical (`StructField(features,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true)`). I have no idea why? – renegademonkey Jan 31 '18 at 08:52
  • Also: all my imports are fine - the error appears to come from the initial `vecToSeq` function and a data format mismatch. – renegademonkey Jan 31 '18 at 09:00
  • @renegademonkey: How does the real dataframe look like? Can you add it to the question? – Shaido Jan 31 '18 at 09:04
  • It's too large (a mixture of 40 features with a combination of Integers, Timestamps, Doubles, Strings, Arrays), but this shouldn't be relevant? The same error occurs if I filter the whole DF down to just the `features` column and try to run `vecToSeq` on that. – renegademonkey Jan 31 '18 at 09:29
  • @renegademonkey: A sample of the `features` column would be sufficient then. If you take say 5 rows of that column and run the `vecToSeq`, does it still give the error? – Shaido Jan 31 '18 at 09:34
  • It's in the expected sparseVector format: format ` (40,[4],[1.0]) | (40,[1],[1.0]) | (40,[5],[1.0]) | (40,[0],[1.0]) ` – renegademonkey Jan 31 '18 at 09:41
  • @renegademonkey: Can you if it works when running on part of the data, you can use `.take(x)` to try. – Shaido Jan 31 '18 at 09:53
  • @renegademonkey: Check so the versions of all spark libraries are correct and the same, see this question: https://stackoverflow.com/questions/44570303/failed-to-execute-user-defined-function-in-apache-spark-using-scala – Shaido Jan 31 '18 at 10:01
  • OMG, by looking into my `pom.xml` file to check the spark versions (which all match) I realised that I had been running this object without re-packaging the whole app. This works in most instances, but obviously not with some UDFs. [facepalm]. Now everything works as it should! Thank you for persevering with me. – renegademonkey Jan 31 '18 at 10:36
  • @renegademonkey: Glad you could solve it in the end :) – Shaido Jan 31 '18 at 10:37