26

I have a CSV in which a field is datetime in a specific format. I cannot import it directly in my Dataframe because it needs to be a timestamp. So I import it as string and convert it into a Timestamp like this

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row

def getTimestamp(x:Any) : Timestamp = {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    if (x.toString() == "") 
    return null
    else {
        val d = format.parse(x.toString());
        val t = new Timestamp(d.getTime());
        return t
    }
}

def convert(row : Row) : Row = {
    val d1 = getTimestamp(row(3))
    return Row(row(0),row(1),row(2),d1)
}

Is there a better, more concise way to do this, with the Dataframe API or spark-sql? The above method requires the creation of an RDD and to give the schema for the Dataframe again.

Rodrigue
  • 3,617
  • 2
  • 37
  • 49
user568109
  • 47,225
  • 17
  • 99
  • 123

7 Answers7

57

Spark >= 2.2

Since you 2.2 you can provide format string directly:

import org.apache.spark.sql.functions.to_timestamp

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+-------------------+
// |id |dts                |ts                 |
// +---+-------------------+-------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2  |#$@#@#             |null               |
// +---+-------------------+-------------------+

Spark >= 1.6, < 2.2

You can use date processing functions which have been introduced in Spark 1.5. Assuming you have following data:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")

You can use unix_timestamp to parse strings and cast it to timestamp

import org.apache.spark.sql.functions.unix_timestamp

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+---------------------+
// |id |dts                |ts                   |
// +---+-------------------+---------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2  |#$@#@#             |null                 |
// +---+-------------------+---------------------+

As you can see it covers both parsing and error handling. The format string should be compatible with Java SimpleDateFormat.

Spark >= 1.5, < 1.6

You'll have to use use something like this:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")

or

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")

due to SPARK-11724.

Spark < 1.5

you should be able to use these with expr and HiveContext.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • `to_timestamp` seems to be designed to drop millisecond information for some reason. – user2357112 Oct 12 '18 at 17:53
  • @user2357112supportsMonica : yes you are right. May be we have to write custom udf with java `SimpleDateFormat` to achive this. or `date_format` from spark functions is handy in some cases. – Ram Ghadiyaram Mar 12 '20 at 14:58
6

I haven't played with Spark SQL yet but I think this would be more idiomatic scala (null usage is not considered a good practice):

def getTimestamp(s: String) : Option[Timestamp] = s match {
  case "" => None
  case _ => {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    Try(new Timestamp(format.parse(s).getTime)) match {
      case Success(t) => Some(t)
      case Failure(_) => None
    }    
  }
}

Please notice I assume you know Row elements types beforehand (if you read it from a csv file, all them are String), that's why I use a proper type like String and not Any (everything is subtype of Any).

It also depends on how you want to handle parsing exceptions. In this case, if a parsing exception occurs, a None is simply returned.

You could use it further on with:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
jarandaf
  • 4,297
  • 6
  • 38
  • 67
  • I have done this before. I felt I should address to core issue before moving to such niceties. If there is a better solution, may not have to do this at all. The problem is about the rows.map which returns rdd and will need to be converted to ddf. So could be that ddf api is lacking or I don't know how to do it. – user568109 Apr 24 '15 at 11:08
  • I don't know if there's other way, but you can convert any RDD to a DF with no trouble. In this concrete example with `sqlContext.createDataFrame(rowRDD, schema)`. To me spark sql is nice to query your data in an SQL-like manner, not to parse the data itself (for such thing, use simple RDD's). – jarandaf Apr 24 '15 at 11:38
  • Try(new Timestamp(format.parse(s).getTime)).toOption – nont Jun 29 '16 at 14:40
1

I have ISO8601 timestamp in my dataset and I needed to convert it to "yyyy-MM-dd" format. This is what I did:

import org.joda.time.{DateTime, DateTimeZone}
object DateUtils extends Serializable {
  def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
  def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
}

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))

And you can just use the UDF in your spark SQL query.

zengr
  • 38,346
  • 37
  • 130
  • 192
1

Spark Version: 2.4.4

scala> import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.types.TimestampType

scala> val df = Seq("2019-04-01 08:28:00").toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: string]

scala> val df_mod = df.select($"ts".cast(TimestampType))
df_mod: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df_mod.printSchema()
root
 |-- ts: timestamp (nullable = true)
Aravind Krishnakumar
  • 2,727
  • 1
  • 28
  • 25
0

I would like to move the getTimeStamp method wrote by you into rdd's mapPartitions and reuse GenericMutableRow among rows in an iterator:

val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter =>
  new Iterator[Row] {
    val row = new GenericMutableRow(4)
    var current: Array[String] = _

    def hasNext = iter.hasNext
    def next() = {
      current = iter.next()
      row(0) = current(0)
      row(1) = current(1)
      row(2) = current(2)

      val ts = getTimestamp(current(3))
      if(ts != null) {
        row.update(3, ts)
      } else {
        row.setNullAt(3)
      }
      row
    }
  }
}

And you should still use schema to generate a DataFrame

val df = sqlContext.createDataFrame(rowRdd, tableSchema)

The usage of GenericMutableRow inside an iterator implementation could be find in Aggregate Operator, InMemoryColumnarTableScan, ParquetTableOperations etc.

yjshen
  • 6,583
  • 3
  • 31
  • 40
  • It's very close to my actual code. Also if you want to parse csv file, you should probably use spark-csv instead of split. The point I wanted to make is adding and mutating columns will return you an rdd which will again need to be converted into ddf by giving schema. Is there a shorter route. – user568109 Apr 24 '15 at 10:57
  • @user568109, I don't think there is one. Since spark-sql would need a schema, it must get one somehow. If you use RDD[CaseClassX], spark-sql would infer schema automatically for you, from case class's definition. But you use here is a Row(Array[Any]), no DataType inference could go there, so you just pass one. – yjshen Apr 24 '15 at 11:04
  • I think that using one reference, mutating it each time and returning it as reference is a recipe for disaster. Have you actually used this approach successfully? – maasg Apr 24 '15 at 16:51
  • @YijieShen I stand corrected. This "mutableRow" looks like a memory optimization as explained here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala could you point me out to resources where they recommend this approach? Looks like something Catalist will like to do behind the scenes. +1 – maasg Apr 25 '15 at 10:25
0

I would use https://github.com/databricks/spark-csv

This will infer timestamps for you.

import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")

val df : DataFrame = new CsvParser().withDelimiter('|')
      .withInferSchema(true)
      .withParseMode("DROPMALFORMED")
      .csvRdd(sqlContext, rdd)
mark
  • 154
  • 9
0

I had some issues with to_timestamp where it was returning an empty string. After a lot of trial and error, I was able to get around it by casting as a timestamp, and then casting back as a string. I hope this helps for anyone else with the same issue:

df.columns.intersect(cols).foldLeft(df)((newDf, col) => {
  val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
  newDf.withColumn(col, conversionFunc)
})