5

I have an RDD containing a timestamp named time of type long:

root
 |-- id: string (nullable = true)
 |-- value1: string (nullable = true)
 |-- value2: string (nullable = true)
 |-- time: long (nullable = true)
 |-- type: string (nullable = true)

I am trying to group by value1, value2 and time as YYYY-MM-DD. I tried to group by cast(time as Date) but then I got the following error:

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.RuntimeException: [1.21] failure: ``DECIMAL'' expected but identifier Date found

Does that mean there is not way to group by a date? I even tried to add another level of casting to have it as a String:

cast(cast(time as Date) as String)

Which returns the same error.

I've read that I could use probably aggregateByKey on the RDD but I don't understand how to use it for a few columns and convert that long to a YYYY-MM-DD String. How should I proceed?

galex
  • 3,279
  • 2
  • 34
  • 46

3 Answers3

5

I solved the issue by adding this function:

def convert( time:Long ) : String = {
  val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd")
  return sdf.format(new java.util.Date(time))
}

And registering it into the sqlContext like this:

sqlContext.registerFunction("convert", convert _)

Then I could finally group by date:

select * from table convert(time)
galex
  • 3,279
  • 2
  • 34
  • 46
  • 1
    Note that for newer versions of Spark, registerFunction should be changed to sqlContext.udf.register("convert", convert _) – jmnwong Apr 23 '16 at 19:49
3

I'm using Spark 1.4.0 and since 1.2.0 DATE appears to be present in the Spark SQL API (SPARK-2562). DATE should allow you to group by the time as YYYY-MM-DD.

I also have a similar data structure, where my created_on is analogous to your time field.

root
|-- id: long (nullable = true)
|-- value1: long (nullable = true)
|-- created_on: long (nullable = true)

I solved it using FROM_UNIXTIME(created_on,'YYYY-MM-dd') and works well:

val countQuery = "SELECT FROM_UNIXTIME(created_on,'YYYY-MM-dd') as `date_created`, COUNT(*) AS `count` FROM user GROUP BY FROM_UNIXTIME(created_on,'YYYY-MM-dd')"

From here on you can do the normal operations, execute the query into a dataframe and so on.

FROM_UNIXTIME worked probably because I have Hive included in my Spark installation and it's a Hive UDF. However it will be included as part of the Spark SQL native syntax in future releases (SPARK-8175).

josemrivera
  • 113
  • 1
  • 7
  • 1
    [SPARK-8175](https://issues.apache.org/jira/browse/SPARK-8175) is now resolved and will be in Spark 1.5.0. – nfo Aug 20 '15 at 08:58
2

Not sure if this is what you meant/needed but I've felt the same struggle-ness dealing with date/timestamp in spark-sql and the only thing I came up with was casting string in timestamp since it seems impossible (to me) having Date type in spark-sql.

Anyway, this is my code to accomplish something similar (Long in place of String) to your need (maybe):

  val mySQL = sqlContext.sql("select cast(yourLong as timestamp) as time_cast" +
"                                    ,count(1) total "+
"                               from logs" +
"                              group by cast(yourLong as timestamp)" 
)
val result= mySQL.map(x=>(x(0).toString,x(1).toString))

and the output is something like this:

(2009-12-18 10:09:28.0,7)
(2009-12-18 05:55:14.0,1)
(2009-12-18 16:02:50.0,2)
(2009-12-18 09:32:32.0,2)

Could this be useful for you as well even though I'm using timestamp and not Date?

Hope it could help

FF

EDIT: in order to test a "single-cast" from Long to Timestamp I've tried this simple change:

      val mySQL = sqlContext.sql("select cast(1430838439 as timestamp) as time_cast" +
"                                    ,count(1) total "+
"                               from logs" +
"                              group by cast(1430838439 as timestamp)" 
)
val result= mySQL.map(x=>(x(0),x(1)))

and all worked fine with the result:

(1970-01-17 14:27:18.439,4)  // 4 because I have 4 rows in my table
Fabio Fantoni
  • 3,077
  • 3
  • 22
  • 32
  • Thank you, really instructive but my field is of type long which is a timestamp so it doesn't fit exactly... I need to convert it first – galex May 05 '15 at 14:54
  • I've tried to "single cast" long to timestamp inside spark-sql and I haven't noticed any problem – Fabio Fantoni May 05 '15 at 15:15
  • I've just edited my answer with a simple extra example using a Long type inside my query – Fabio Fantoni May 05 '15 at 15:24
  • if you're still dealing with this problem, could you please post your code in order to understand what gives you that error? – Fabio Fantoni May 06 '15 at 08:04
  • Grouping by the timestamp works but brings absolutely no grouping because well, the timestamp of every row is different. I solved this by turning the long to a string of format yyyy-MM-dd and grouping then on that. – galex May 06 '15 at 09:07