181

Suppose I'm doing something like:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment
1997 Ford  E350  Go get one now th...

But I really wanted the year as Int (and perhaps transform some other columns).

The best I could come up with was

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

which is a bit convoluted.

I'm coming from R, and I'm used to being able to write, e.g.

df2 <- df %>%
   mutate(year = year %>% as.integer,
          make = make %>% toupper)

I'm likely missing something, since there should be a better way to do this in Spark/Scala...

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
kevinykuo
  • 4,600
  • 5
  • 23
  • 31

23 Answers23

155

Edit: Newest newest version

Since spark 2.x you should use dataset api instead when using Scala [1]. Check docs here:

https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

If working with python, even though easier, I leave the link here as it's a very highly voted question:

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]

[1] https://spark.apache.org/docs/latest/sql-programming-guide.html:

In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset to represent a DataFrame.

Edit: Newest version

Since spark 2.x you can use .withColumn. Check the docs here:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

Oldest answer

Since Spark version 1.4 you can apply the cast method with DataType on the column:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

If you are using sql expressions you can also do:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

For more info check the docs: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

msemelman
  • 2,877
  • 1
  • 21
  • 19
  • 4
    why did you used withColumn followed by drop? Isn't easier to just use withColumn with the original column name? – Ameba Spugnosa Aug 09 '16 at 07:29
  • @AmebaSpugnosa I think by the time I used it Spark crashed if it had repeated column names. Not when you create them, but when you use them. – msemelman Aug 10 '16 at 13:24
  • 5
    there is no need to drop column followed by a rename. You can do in one line `df.withColumn("ctr", temp("ctr").cast(DecimalType(decimalPrecision, decimalScale)))` – ruhong Jul 25 '17 at 03:21
  • 1
    Is an entire new dataframe copy created just to recast a column in this case? Am I missing something? Or perhaps there is some optimization behind the scenes? – user1814008 Aug 09 '17 at 23:56
  • 1
    @user1814008 Maybe you want to check https://stackoverflow.com/questions/30691385/internal-work-of-spark/30691654#30691654 . There you can find an in depth explanation of how spark transformations and actions work and why applying transformations won't necessarily create a new dataframe. – msemelman Sep 04 '17 at 20:26
  • @oneleggedmule Check previous comment. – msemelman Sep 04 '17 at 20:27
  • 6
    Going by the [docs](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame) of `Spark 2.x`, `df.withColumn(..)` can **add or replace** a column depending on the `colName` argument – y2k-shubham May 03 '18 at 13:02
  • Would be nice if this could be automated. – Soerendip Sep 14 '18 at 19:57
  • link is broken... – Z4-tier Dec 12 '21 at 18:39
  • @Z4-tier thank you, updated! – msemelman Feb 07 '22 at 19:40
91

[EDIT: March 2016: thanks for the votes! Though really, this is not the best answer, I think the solutions based on withColumn, withColumnRenamed and cast put forward by msemelman, Martin Senne and others are simpler and cleaner].

I think your approach is ok, recall that a Spark DataFrame is an (immutable) RDD of Rows, so we're never really replacing a column, just creating new DataFrame each time with a new schema.

Assuming you have an original df with the following schema:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

And some UDF's defined on one or several columns:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

Changing column types or even building a new DataFrame from another can be written like this:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

which yields:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

This is pretty close to your own solution. Simply, keeping the type changes and other transformations as separate udf vals make the code more readable and re-usable.

Svend
  • 6,352
  • 1
  • 25
  • 38
  • 32
    This is neither safe nor efficient. __Not safe__ because a single `NULL` or malformed entry will crash a whole job. __Not efficient__ because UDFs are not transparent to Catalyst. Using UDFs for complex operations is just fine, but there is no reason to use these for basic type casting. This why we have `cast` method (see [an answer by Martin Senne](http://stackoverflow.com/a/32634826/1560062)). Making things transparent to Catalyst requires more work but basic safety is just a matter of putting `Try` and `Option` to work. – zero323 Mar 01 '16 at 00:39
  • I didn't see anything related to converting string to date for example "05-APR-2015" – dbspace Apr 29 '16 at 21:33
  • 4
    Is there a way to reduce your `withColumn()` section to a generic one that iterates through all columns? – Boern May 17 '16 at 14:55
  • Thanks zero323, upon reading this I figured why the udf solution here crashes. Some comments are better than some answers on SO :) –  Oct 14 '18 at 12:11
  • Is there any way in which we can get to know the corrupt row, means records which are having columns of wrong data types during casting. As cast function makes those fields as null – Etisha Jun 24 '19 at 08:52
71

As the cast operation is available for Spark Column's (and as I personally do not favour udf's as proposed by @Svend at this point), how about:

df.select( df("year").cast(IntegerType).as("year"), ... )

to cast to the requested type? As a neat side effect, values not castable / "convertable" in that sense, will become null.

In case you need this as a helper method, use:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

which is used like:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
danday74
  • 52,471
  • 49
  • 232
  • 283
Martin Senne
  • 5,939
  • 6
  • 30
  • 47
  • 2
    Can you advice me on how to proceed, if I need to cast and rename a whole bunch of columns (I have 50 columns, and fairly new to scala, not sure what is the best way to approach it without creating a massive duplication)? Some columns should stay String, some should be cast to Float. – Dmitry Smirnov Apr 14 '16 at 13:03
  • how to convert a String to a Date for example "25-APR-2016" in the column and "20160302" – dbspace Apr 29 '16 at 21:30
  • @DmitrySmirnov Did you ever get an answer? I have the same question. ;) – Evan Zamir Mar 22 '17 at 18:33
  • @EvanZamir unfortunately not, I ended up doing a shitton of operations to be able to use data as rdd in other steps. I wonder if this became easier these days :) – Dmitry Smirnov Mar 23 '17 at 11:34
64

First, if you wanna cast type, then this:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

With same column name, the column will be replaced with new one. You don't need to do add and delete steps.

Second, about Scala vs R.
This is the code that most similar to R I can come up with:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

Though the code length is a little longer than R's. That is nothing to do with the verbosity of the language. In R the mutate is a special function for R dataframe, while in Scala you can easily ad-hoc one thanks to its expressive power.
In word, it avoid specific solutions, because the language design is good enough for you to quickly and easy build your own domain language.


side note: df.columns is surprisingly a Array[String] instead of Array[Column], maybe they want it look like Python pandas's dataframe.

WeiChing 林煒清
  • 4,452
  • 3
  • 30
  • 65
  • 1
    Could you please give the equivalent for pyspark? – Harit Vishwakarma Oct 03 '15 at 18:06
  • 1
    I am getting "illegal start of definition" .withColumn("age", $"age".cast(sql.types.DoubleType)) for my "age" field. Any suggestion? – BlueDolphin Nov 24 '17 at 04:14
  • Do you have to .cache() the data frame if we are doing these conversions on many columns for performance reason, or is it not required as Spark optimizes them? – skjagini Mar 06 '19 at 21:36
  • The import can be `import org.apache.spark.sql.types._` and then instead of `sql.types.IntegerType` just `IntegerType`. – nessa.gp May 06 '20 at 14:00
20

You can use selectExpr to make it a little cleaner:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")
dnlbrky
  • 9,396
  • 2
  • 51
  • 64
13

Java code for modifying the datatype of the DataFrame from String to Integer

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))

It will simply cast the existing(String datatype) to Integer.

manishbelsare
  • 131
  • 1
  • 5
  • 1
    There's no `DataTypes` in `sql.types`! it's `DataType`. Moreover, one can simply import `IntegerType` and cast. – Ehsan M. Kermani Jul 13 '16 at 17:31
  • @EhsanM.Kermani actually DatyaTypes.IntegerType is a legit reference. – Cupitor Jun 21 '17 at 22:56
  • 1
    @Cupitor `DataTypes.IntegerType` used to be in [DeveloperAPI mode](https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/types/IntegerType.html) and it's [stable in v.2.1.0](https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/types/IntegerType.html) – Ehsan M. Kermani Jun 21 '17 at 23:31
  • This is the best solution! –  Oct 14 '18 at 12:09
11

I think this is lot more readable for me.

import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))

This will convert your year column to IntegerType with creating any temporary columns and dropping those columns. If you want to convert to any other datatype, you can check the types inside org.apache.spark.sql.types package.

Piyush Patel
  • 1,646
  • 1
  • 14
  • 26
8

To convert the year from string to int, you can add the following option to the csv reader: "inferSchema" -> "true", see DataBricks documentation

Peter Rose
  • 107
  • 1
  • 2
8

Generate a simple dataset containing five values and convert int to string type:

val df = spark.range(5).select( col("id").cast("string") )
Andrej Kesely
  • 168,389
  • 15
  • 48
  • 91
user8106134
  • 107
  • 1
  • 4
6

So this only really works if your having issues saving to a jdbc driver like sqlserver, but it's really helpful for errors you will run into with syntax and types.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)
ben jarman
  • 1,138
  • 10
  • 11
  • Can you help me implement the same code in Java ? and how to register the customJdbcDialect into DataFrame – abhijitcaps Feb 16 '17 at 13:47
  • Nice one I did the same with Vertica, but since spark 2.1. JDbcUtil you need to implement only the specific datatype you need. dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse( throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.simpleString}")) – Arnon Rodman Feb 18 '18 at 16:20
5

the answers suggesting to use cast, FYI, the cast method in spark 1.4.1 is broken.

for example, a dataframe with a string column having value "8182175552014127960" when casted to bigint has value "8182175552014128100"

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

We had to face a lot of issue before finding this bug because we had bigint columns in production.

lxg
  • 12,375
  • 12
  • 51
  • 73
sauraI3h
  • 61
  • 1
  • 5
5
df.select($"long_col".cast(IntegerType).as("int_col"))
soulmachine
  • 3,917
  • 4
  • 46
  • 56
4

You can use below code.

df.withColumn("year", df("year").cast(IntegerType))

Which will convert year column to IntegerType column.

CodeChanger
  • 7,953
  • 5
  • 49
  • 80
adarsh
  • 151
  • 2
  • 10
4

Using Spark Sql 2.4.0 you can do that:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")
Eric Bellet
  • 1,732
  • 5
  • 22
  • 40
2

This method will drop the old column and create new columns with same values and new datatype. My original datatypes when the DataFrame was created were:-

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

After this I ran following code to change the datatype:-

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

After this my result came out to be:-

root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)
PirateJack
  • 343
  • 1
  • 3
  • 8
2

Another solution is as follows:

1) Keep "inferSchema" as False

2) While running 'Map' functions on the row, you can read 'asString' (row.getString...)

//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
            .read()
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .option("inferSchema","false")
            .load(args[0]);

JavaRDD<Box> vertices = enginesDataSet
            .select("BOX","BOX_CD")
            .toJavaRDD()
            .map(new Function<Row, Box>() {
                @Override
                public Box call(Row row) throws Exception {
                    return new Box((String)row.getString(0),(String)row.get(1));
                }
            });
Vibha
  • 939
  • 9
  • 17
2

So many answers and not much thorough explanations

The following syntax works Using Databricks Notebook with Spark 2.4

from pyspark.sql.functions import *
df = df.withColumn("COL_NAME", to_date(BLDFm["LOAD_DATE"], "MM-dd-yyyy"))

Note that you have to specify the entry format you have (in my case "MM-dd-yyyy") and the import is mandatory as the to_date is a spark sql function

Also Tried this syntax but got nulls instead of a proper cast :

df = df.withColumn("COL_NAME", df["COL_NAME"].cast("Date"))

(Note I had to use brackets and quotes for it to be syntaxically correct though)


PS : I have to admit this is like a syntax jungle, there are many possible ways entry points, and the official API references lack proper examples.

Mehdi LAMRANI
  • 11,289
  • 14
  • 88
  • 130
2

Why not just do as described under http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast

df.select(df.year.cast("int"),"make","model","comment","blank")
1

One can change data type of a column by using cast in spark sql. table name is table and it has two columns only column1 and column2 and column1 data type is to be changed. ex-spark.sql("select cast(column1 as Double) column1NewName,column2 from table") In the place of double write your data type.

1

Another way:

// Generate a simple dataset containing five values and convert int to string type

val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")
aschipfl
  • 33,626
  • 12
  • 54
  • 99
user8106134
  • 107
  • 1
  • 4
1

In case you have to rename dozens of columns given by their name, the following example takes the approach of @dnlbrky and applies it to several columns at once:

df.selectExpr(df.columns.map(cn => {
    if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
    else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
    else cn
}):_*)

Uncasted columns are kept unchanged. All columns stay in their original order.

cubic lettuce
  • 6,301
  • 3
  • 18
  • 25
0
    val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
    //Schema to be applied to the table
    val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)

    val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()
Aravind Krishnakumar
  • 2,727
  • 1
  • 28
  • 25
0

In case if you want to change multiple columns of a specific type to another without specifying individual column names

/* Get names of all columns that you want to change type. 
In this example I want to change all columns of type Array to String*/
    val arrColsNames = originalDataFrame.schema.fields.filter(f => f.dataType.isInstanceOf[ArrayType]).map(_.name)

//iterate columns you want to change type and cast to the required type
val updatedDataFrame = arrColsNames.foldLeft(originalDataFrame){(tempDF, colName) => tempDF.withColumn(colName, tempDF.col(colName).cast(DataTypes.StringType))}

//display

updatedDataFrame.show(truncate = false)
Ravi
  • 117
  • 1
  • 2
  • 10