I have List[Double]
, how to convert it to org.apache.spark.sql.Column
. I am trying to insert it as a column using .withColumn()
to existing DataFrame
.
-
What are the `Double` elements in `List[Double]`? – Jacek Laskowski Jul 24 '16 at 17:16
-
@JacekLaskowski, its just a list of numbers(double data type) that i want to add as columns in an existing dataframe. – vdep Jul 25 '16 at 08:23
1 Answers
It cannot be done directly. Column
is not a data structure but a representation of a specific SQL expression. It is not bound to a specific data. You'll have to transform your data first. One way to approach this is to parallelize
and join
by index:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, DoubleType}
val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y")
val aList = List(1.0, -1.0, 0.0)
val rows = df.rdd.zipWithIndex.map(_.swap)
.join(sc.parallelize(aList).zipWithIndex.map(_.swap))
.values
.map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) }
sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false))
Another similar approach is to index and use and UDF to handle the rest:
import scala.util.Try
val indexedDf = sqlContext.createDataFrame(
df.rdd.zipWithIndex.map {
case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i)
},
df.schema.add("idx_", "long")
)
def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption)
indexedDf.withColumn("z", addValue(aList.toVector)($"idx_"))
Unfortunately both solutions will suffer from the issues. First of all passing local data through driver introduces a serious bottleneck in your program. Typically data should accessed directly from the executors. Another problem are growing RDD lineages if you want to perform this operation iteratively.
While the second issue can be addressed by checkpointing the first one makes this idea useless in general. I would strongly suggest that you either build completely structure first, and read it on Spark, or rebuild you pipeline in a way that can leverage Spark architecture. For example if data comes from an external source perform reads directly for each chunk of data using map
/ mapPartitions
.

- 72,696
- 27
- 242
- 420

- 322,348
- 103
- 959
- 935