25

I have my timestamp in UTC and ISO8601, but using Structured Streaming, it gets automatically converted into the local time. Is there a way to stop this conversion? I would like to have it in UTC.

I'm reading json data from Kafka and then parsing them using the from_json Spark function.

Input:

{"Timestamp":"2015-01-01T00:00:06.222Z"}

Flow:

SparkSession
  .builder()
  .master("local[*]")
  .appName("my-app")
  .getOrCreate()
  .readStream()
  .format("kafka")
  ... //some magic
  .writeStream()
  .format("console")
  .start()
  .awaitTermination();

Schema:

StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});

Output:

+--------------------+
|           Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+

As you can see, the hour has incremented by itself.

PS: I tried to experiment with the from_utc_timestamp Spark function, but no luck.

Community
  • 1
  • 1
Martin Brisiak
  • 3,872
  • 12
  • 37
  • 51

4 Answers4

42

For me it worked to use:

spark.conf.set("spark.sql.session.timeZone", "UTC")

It tells the spark SQL to use UTC as a default timezone for timestamps. I used it in spark SQL for example:

select *, cast('2017-01-01 10:10:10' as timestamp) from someTable

I know it does not work in 2.0.1. but works in Spark 2.2. I used in SQLTransformer also and it worked.

I am not sure about streaming though.

astro_asz
  • 2,278
  • 3
  • 15
  • 31
  • 1
    Just to be clear, without changing `spark.sql.session.timeZone`, does Spark convert timestamps (without timezone) to my local time zone and store converted timestamp when importing from a CSV, or does it do the conversion only when I run a `select` and write the results to console (or Kafka etc)? – lfk Jan 02 '19 at 23:40
  • How can I set it to local time? Whatever I try its giving me UTC. The question i posted -> https://stackoverflow.com/questions/58143743/how-to-get-the-current-local-time-or-system-time-in-spark-scala-dataframe/58143801#58143801 Please Help me with this. – ss301 Sep 28 '19 at 15:04
  • isn't default value of `spark.sql.session.timeZone` is UTC? – Dev Oct 07 '19 at 05:56
21

Note:

This answer is useful primarily in Spark < 2.2. For newer Spark version see the answer by astro-asz

However we should note that as of Spark 2.4.0, spark.sql.session.timeZone doesn't set user.timezone (java.util.TimeZone.getDefault). So setting spark.sql.session.timeZone alone can result in rather awkward situation where SQL and non-SQL components use different timezone settings.

Therefore I still recommend setting user.timezone explicitly, even if spark.sql.session.timeZone is set.

TL;DR Unfortunately this is how Spark handles timestamps right now and there is really no built-in alternative, other than operating on epoch time directly, without using date/time utilities.

You can an insightful discussion on the Spark developers list: SQL TIMESTAMP semantics vs. SPARK-18350

The cleanest workaround I've found so far is to set -Duser.timezone to UTC for both the driver and executors. For example with submit:

bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
                --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"

or by adjusting configuration files (spark-defaults.conf):

spark.driver.extraJavaOptions      -Duser.timezone=UTC
spark.executor.extraJavaOptions    -Duser.timezone=UTC
Ashvjit Singh
  • 415
  • 4
  • 10
zero323
  • 322,348
  • 103
  • 959
  • 935
  • I would also add that if you want to use other time zones, you can't use `+` or `-` with UTC (like UTC+2, UTC-1), but it works with GMT (GMT+2, GMT-1 works). – Shikkou Jul 08 '19 at 11:15
0

Although two very good answers have been provided, I found them both to be a bit of a heavy hammer to solve the problem. I did not want anything that would require modifying time zone parsing behavior across the whole app, or an approach that would alter the default time zone of my JVM. I did find a solution after much pain, which I will share below...

Parsing time[/date] strings into timestamps for date manipulations, then correctly rendering the result back

First, let's address the issue of how to get Spark SQL to correctly parse a date[/time] string (given a format) into a timetamp and then properly render that timestamp back out so it shows the same date[/time] as the original string input. The general approach is:

- convert a date[/time] string to time stamp [via to_timestamp]
    [ to_timestamp  seems to assume the date[/time] string represents a time relative to UTC (GMT time zone) ]
- relativize that timestamp to the timezone we are in via from_utc_timestamp 

The test code below implements this approach. 'timezone we are in' is passed as the first argument to the timeTricks method. The code converts the input string "1970-01-01" to localizedTimeStamp (via from_utc_timestamp) and verifies that the 'valueOf' of that time stamp is the same as "1970-01-01 00:00:00".

object TimeTravails {
  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._

    val spark: SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExample")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    import spark.implicits._
    import java.sql.Timestamp

    def timeTricks(timezone: String): Unit =  {
      val df2 = List("1970-01-01").toDF("timestr"). // can use to_timestamp even without time parts !
        withColumn("timestamp", to_timestamp('timestr, "yyyy-MM-dd")).
        withColumn("localizedTimestamp", from_utc_timestamp('timestamp, timezone)).
        withColumn("weekday", date_format($"localizedTimestamp", "EEEE"))
      val row = df2.first()
      println("with timezone: " + timezone)
      df2.show()
      val (timestamp, weekday) = (row.getAs[Timestamp]("localizedTimestamp"), row.getAs[String]("weekday"))

      timezone match {
        case "UTC" =>
          assert(timestamp ==  Timestamp.valueOf("1970-01-01 00:00:00")  && weekday == "Thursday")
        case "PST" | "GMT-8" | "America/Los_Angeles"  =>
          assert(timestamp ==  Timestamp.valueOf("1969-12-31 16:00:00")  && weekday == "Wednesday")
        case  "Asia/Tokyo" =>
          assert(timestamp ==  Timestamp.valueOf("1970-01-01 09:00:00")  && weekday == "Thursday")
      }
    }

    timeTricks("UTC")
    timeTricks("PST")
    timeTricks("GMT-8")
    timeTricks("Asia/Tokyo")
    timeTricks("America/Los_Angeles")
  }
}

Solution to problem of Structured Streaming Interpreting incoming date[/time] strings as UTC (not local time)

The code below illustrates how to apply the above tricks (with a slight modification) so as to correct the problem of timestamps being shifted by the offset between local time and GMT.

object Struct {
  import org.apache.spark.sql.SparkSession
  import org.apache.spark.sql.functions._

  def main(args: Array[String]): Unit = {

    val timezone = "PST"

    val spark: SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExample")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", "9999")
      .load()

    import spark.implicits._


    val splitDf = df.select(split(df("value"), " ").as("arr")).
      select($"arr" (0).as("tsString"), $"arr" (1).as("count")).
      withColumn("timestamp", to_timestamp($"tsString", "yyyy-MM-dd"))
    val grouped = splitDf.groupBy(window($"timestamp", "1 day", "1 day").as("date_window")).count()

    val tunedForDisplay =
      grouped.
        withColumn("windowStart", to_utc_timestamp($"date_window.start", timezone)).
        withColumn("windowEnd", to_utc_timestamp($"date_window.end", timezone))

    tunedForDisplay.writeStream
      .format("console")
      .outputMode("update")
      .option("truncate", false)
      .start()
      .awaitTermination()
  }
}

The code requires input be fed via socket... I use the program 'nc' (net cat) started like this:

nc -l 9999

Then I start the Spark program and provide net cat with one line of input:

1970-01-01 4

The output I get illustrates the problem with the offset shift:

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+-------------------+-------------------+
|date_window                               |count|windowStart        |windowEnd          |
+------------------------------------------+-----+-------------------+-------------------+
|[1969-12-31 16:00:00, 1970-01-01 16:00:00]|1    |1970-01-01 00:00:00|1970-01-02 00:00:00|
+------------------------------------------+-----+-------------------+-------------------+

Note that the start and end for date_window is shifted by eight hours from the input (because I am in the GMT-7/8 timezone, PST). However, I correct this shift using to_utc_timestamp to get the proper start and end date times for the one day window that subsumes the input: 1970-01-01 00:00:00,1970-01-02 00:00:00.

Note that in the first block of code presented we used from_utc_timestamp, whereas for the structured streaming solution we used to_utc_timestamp. I have yet to figure out which of these two to use in a given situation. (Please clue me in if you know!).

Chris Bedford
  • 2,560
  • 3
  • 28
  • 60
0

Another solution that worked for me, was to set the jvm default timezone to your target timezone (UTC in your case).

TimeZone.setDefault(TimeZone.getTimeZone("UTC"));

I added above code before writing my spark dataframe to database.

itsajitsharma
  • 21
  • 1
  • 4