1

I have a spark Dataframe df with the following schema:

root
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = false)

I would like to create a new Dataframe where each row will be a Vector of Doubles and expecting to get the following schema:

root
     |-- features: vector (nullable = true)

So far I have the following piece of code (influenced by this post: Converting Spark Dataframe(with WrappedArray) to RDD[labelPoint] in scala) but I fear something is wrong with it because it takes a very long time to compute even a reasonable amount of rows. Also, if there are too many rows the application will crash with a heap space exception.

val clustSet = df.rdd.map(r => {
          val arr = r.getAs[mutable.WrappedArray[Double]]("features")
          val features: Vector = Vectors.dense(arr.toArray)
          features
          }).map(Tuple1(_)).toDF()

I suspect that the instruction arr.toArray is not a good Spark practice in this case. Any clarification would be very helpful.

Thank you!

Community
  • 1
  • 1
user159941
  • 331
  • 5
  • 15

1 Answers1

4

It's because .rdd have to unserialize objects from internal in-memory format and it is very time consuming.

It's ok to use .toArray - you are operating on row level, not collecting everything to the driver node.

You can do this very easy with UDFs:

import org.apache.spark.ml.linalg._
val convertUDF = udf((array : Seq[Double]) => {
  Vectors.dense(array.toArray)
})
val withVector = dataset
  .withColumn("features", convertUDF('features))

Code is from this answer: Convert ArrayType(FloatType,false) to VectorUTD

However there author of the question didn't ask about differences

Community
  • 1
  • 1
T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • Thank you very much, that helped a lot and marked it as the answer. I can run more rows now and it is satisfying time-wise. I still get though an exception: __org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1__ when I try 200,000 rows. Would you have an insight about this? Thanks again. – user159941 May 18 '17 at 16:06
  • @user159941 Please check http://stackoverflow.com/questions/31947335/how-kryo-serializer-allocates-buffer-in-spark – T. Gawęda May 18 '17 at 16:07
  • 1
    I set in my code the following: **val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryoserializer.buffer.max.mb","256")** and it worked! Thank you. – user159941 May 19 '17 at 09:32