2

I am trying to do type cast of all data frame columns based on an input as shown below, how can I do this with a single command that converts all data frame columns? The following code works fine, but I need a generic command that works for any data frame. Thanks in advance for your kind suggestions.

    for (colIndex <- 0 to tmpDF.columns.length - 1) {

      val columns = df.columns
      newdf = df.withColumn(columns(0), df(columns(0)).cast(dataType(0)))
        .withColumn(columns(1), df(columns(1)).cast(dataType(1)))
        .withColumn(columns(2), df(columns(2)).cast(dataType(2)))
        .withColumn(columns(3), df(columns(3)).cast(dataType(3)))
        .withColumn(columns(4), df(columns(4)).cast(dataType(4)))

   //  newdf = df.withColumn(columns(colIndex), df(columns(colIndex)).cast(dataType(colIndex))) --> This didn't work, only last column was updated

}
Garipaso
  • 391
  • 2
  • 8
  • 22

3 Answers3

1

It only updates the last column because you always append to column to df, you should keep appending to newDF This should work:

var newDF = df
val columns = df.columns
for (colIndex <- tmpDF.columns.indices) {
    newDF = newDF.withColumn(columns(colIndex), df(columns(colIndex)).cast(dataType(colIndex))) 
}
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
1

Both answers are doing many withColumn - there are ok, but each withColumn adds Projection to query plan. It's better to do one big select:

val columns = df.columns
val columnsNew = new Array[Column]
for (colIndex <- tmpDF.columns.indices) {
    columnsNew(colIndex) = columns(colIndex), df(columns(colIndex)).cast(dataType(colIndex))) 
}
var newDF = df.select(columnsNew :_*)

You should have only one projection, so it can be faster on very large dataset - CodeGen deal better with less amount of projections in query plan, that's why it's better to do single select with multiple columns :)

T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • @RaphaelRoth I've used part of your code and I up voted your answer as "thank you" :) Hope it's ok for you – T. Gawęda Jul 08 '17 at 20:34
  • Nice option! It seems that the code can get more complicated if we want to process only a subset of the columns without removing the others or changing the order, in which case I think it might be worth sticking to `withColumn`, am I wrong? – Daniel de Paula Jul 08 '17 at 21:43
  • @DanieldePaula Yes, withColumn is very good for dealing with subset of columns or just adding or changing manually few columns :) Congrats for the Spark Top 20! :) – T. Gawęda Jul 19 '17 at 10:25
0

Another suggestion, using a more "functional way", is with foldLeft. In this case, your "dataType" variable can be a Map[String, DataType] (or Map[String, String]) mapping each column name to the new type you want to give :

import org.apache.spark.sql.types._
import spark.implicits._

val df = Seq((1, "1"), (2, "2")).toDF("col1", "col2")

val dataType: Map[String, DataType] = Map(
  "col1" -> StringType,
  "col2" -> IntegerType
)

val result = df.columns.foldLeft(df) { 
  (newDF, colName) => newDF.withColumn(colName, newDF(colName).cast(dataType(colName)))
}

result.show

Note that no mutable vars were needed, which in Scala should often be our goal.

If you want to keep using the indexes instead of the column names, you can adapt the code above accordingly:

val dataType = List(StringType, IntegerType)
val cols = df.columns
val result = cols.indices.foldLeft(df) { 
  (newDF, i) => newDF.withColumn(cols(i), newDF(cols(i)).cast(dataType(i)))
}

In both cases, the output is:

df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]
result: org.apache.spark.sql.DataFrame = [col1: string, col2: int]

+----+----+
|col1|col2|
+----+----+
|   1|   1|
|   2|   2|
+----+----+

Here you can find some info about foldLeft.

Daniel de Paula
  • 17,362
  • 9
  • 71
  • 72