6

Since the VectorAssembler is crashing, if a passed column has any other type than NumericType or BooleanType and I'm dealing with a lot of TimestampType columns, I want to know:

Is there a easy way, to cast multiple columns at once?

Based on this answer I already have a convenient way to cast a single column:

def castColumnTo(df: DataFrame, 
    columnName: String, 
    targetType: DataType ) : DataFrame = {
      df.withColumn( columnName, df(columnName).cast(targetType) )
}

I thought about calling castColumnTo recursively, but I strongly doubt that that's the (performant) way to go.

Community
  • 1
  • 1
Boern
  • 7,233
  • 5
  • 55
  • 86
  • 1
    What's stopping you from iterating over the columns and calling this function (no recursion needed)? – LiMuBei Feb 02 '17 at 08:40
  • 1
    Why would you need to recurse? Do you mean iterate? Remember Spark is lazy, so there's no obvious reason why it would not be sufficiently performant – The Archetypal Paul Feb 02 '17 at 08:40

4 Answers4

8

casting of all columns with idiomatic approach in scala

def castAllTypedColumnsTo(df: DataFrame, sourceType: DataType, targetType: DataType) = {
df.schema.filter(_.dataType == sourceType).foldLeft(df) {
    case (acc, col) => acc.withColumn(col.name, df(col.name).cast(targetType))
 }
}
rogue-one
  • 11,259
  • 7
  • 53
  • 75
  • @TheArchetypalPaul removed the unnecassary assignment stmt.. and made the suggested changed – rogue-one Feb 02 '17 at 09:25
  • 1
    Yep, pretty clear now. I do approve, however, of the OP's split into two lines, because it gives the opportunity to add the logging that he used. You can do an awful lot in one line of scala, but sometimes when you come back to it, it takes quite a while to work out what does. But it's a matter of taste and maybe is just me. I'll remove my first comment – The Archetypal Paul Feb 02 '17 at 09:28
  • Both answers are helpful, but for both answers,if I change 1000 columns, how many instances of DataFrame are being created? Will foldLeft improve garbage collection? – Jake Aug 31 '17 at 18:09
  • I ask because withColumn function creates a new dataframe on each call despite the foldLeft maintaining one instance – Jake Aug 31 '17 at 18:25
7

Based on the comments (thanks!) I came up with the following code (no error handling implemented):

def castAllTypedColumnsTo(df: DataFrame, 
   sourceType: DataType, targetType: DataType) : DataFrame = {

      val columnsToBeCasted = df.schema
         .filter(s => s.dataType == sourceType)

      //if(columnsToBeCasted.length > 0) {
      //   println(s"Found ${columnsToBeCasted.length} columns " +
      //      s"(${columnsToBeCasted.map(s => s.name).mkString(",")})" +
      //      s" - casting to ${targetType.typeName.capitalize}Type")
      //}

      columnsToBeCasted.foldLeft(df){(foldedDf, col) => 
         castColumnTo(foldedDf, col.name, LongType)}
}

Thanks for the inspiring comments. foldLeft (explained here and here) saves a for loop to iterate over a var dataframe.

Boern
  • 7,233
  • 5
  • 55
  • 86
  • 2
    You could use `foldLeft` instead of the `for` loop and avoid the `var` for what I would argue is improved style. `val dfReturn = columnsToBeCasted.foldLeft(df){(accdf, col) => castColumnTo(accdf, col.name, LongType)}` – The Archetypal Paul Feb 02 '17 at 09:01
  • @TheArchetypalPaul out of curiosity.. why are you not posting an answer even though clearly you know the answer :) – rogue-one Feb 02 '17 at 09:18
  • @rogue-one can't be bothered :) I don't really need the rep, and was only tweaking Boen's answer, so he should get the credit. And I'm at work so I'm in drive-by mode anyway – The Archetypal Paul Feb 02 '17 at 09:20
  • Thanks for the comments ! I'll update my answer. For anybody whos interested in folding: https://oldfashionedsoftware.com/2009/07/30/lots-and-lots-of-foldleft-examples/ – Boern Feb 02 '17 at 09:22
  • 1
    @Boern, `fold` is even cooler than that: http://www.cs.nott.ac.uk/~pszgmh/fold.pdf – The Archetypal Paul Feb 02 '17 at 09:24
  • @rogue-one by commenting I ended up in studying and learning much more than by just copy-pasting his answer :) – Boern Feb 02 '17 at 09:34
0
FastDf = (spark.read.csv("Something.csv", header = False, mode="DRPOPFORMED"))
FastDf.OldTypes = [feald.dataType for feald in FastDf.schema.fields]
FastDf.NewTypes = [StringType(), FloatType(), FloatType(), IntegerType()]
FastDf.OldColnames = FastDf.columns
FastDf.NewColnames = ['S_tring', 'F_loat', 'F_loat2', 'I_nteger']
FastDfSchema = FastDf.select(*
                             (FastDf[colnumber]
                              .cast(FastDf.NewTypes[colnumber])
                              .alias(FastDf.NewColnames[colnumber]) 
                                  for colnumber in range(len(FastDf.NewTypes)
                                                )
                             )
                            )

I know it is in pyspark but the logic might be handy.

András Nagy
  • 311
  • 2
  • 11
0

I am translating scala program for python. I found smart answer to your problem. The column are named V1 - V28, Time, Amount, Class. (I am not Scala pro) The solution looks like this.

// cast all the column to Double type.
val df = raw.select(((1 to 28).map(i => "V" + i) ++ Array("Time", "Amount", "Class")).map(s => col(s).cast("Double")): _*)

The link: https://github.com/intel-analytics/analytics-zoo/blob/master/apps/fraudDetection/Fraud%20Detction.ipynb

Jad Gift
  • 305
  • 4
  • 15
  • 1
    This can be extended to provide a schema: `val df = raw.select(schema.map{case (s, coltype) => col(s).cast(coltype)}.toList: _*)` where schema is a map collection like e.g. `val schema = Map("evar39" -> "DOUBLE", "evar46" -> "STRING")` – Juergen Jun 18 '19 at 09:45