(Tested on Spark 2.2 and 2.3)
I am using Spark to aggregate stock trading ticks into daily OHLC (open-high-low-close) records.
The input data is like
val data = Seq(("2018-07-11 09:01:00", 34.0), ("2018-07-11 09:04:00", 32.0), ("2018-07-11 09:02:00", 35.0), ("2018-07-11 09:03:00", 30.0), ("2018-07-11 09:00:00", 33.0), ("2018-07-12 09:01:00", 56.0), ("2018-07-12 09:04:00", 54.0), ("2018-07-12 09:02:00", 51.0), ("2018-07-12 09:03:00", 50.0), ("2018-07-12 09:00:00", 51.0)).toDF("time", "price")
data.createOrReplaceTempView("ticks")
data.show
scala>
shown as
+-------------------+-----+
| time|price|
+-------------------+-----+
|2018-07-11 09:01:00| 34.0|
|2018-07-11 09:04:00| 32.0|
|2018-07-11 09:02:00| 35.0|
|2018-07-11 09:03:00| 30.0|
|2018-07-11 09:00:00| 33.0|
|2018-07-12 09:01:00| 56.0|
|2018-07-12 09:04:00| 54.0|
|2018-07-12 09:02:00| 51.0|
|2018-07-12 09:03:00| 50.0|
|2018-07-12 09:00:00| 51.0|
+-------------------+-----+
Desired output is
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+
There have been many SQL solutions such as this and this.
SELECT
TO_DATE(time) AS date,
FIRST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS open,
MAX(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS high,
MIN(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS low,
LAST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS close
FROM ticks
Due to the limitation of SQL, these solutions are cumbersome.
Today, I found Spark SQL can use FIRST_VALUE
and LAST_VALUE
in a GROUP BY
context, which is not allowed in standard SQL.
This unlimitation of Spark SQL derives a neat and tidy solution like this:
SELECT
TO_DATE(time) AS date,
FIRST_VALUE(price) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST_VALUE(price) AS close
FROM ticks
GROUP BY TO_DATE(time)
You can try it
spark.sql("SELECT TO_DATE(time) AS date, FIRST(price) AS open, MAX(price) AS high, MIN(price) AS low, LAST(price) AS close FROM ticks GROUP BY TO_DATE(time)").show
scala>
shown as
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|34.0|35.0|30.0| 33.0|
|2018-07-12|56.0|56.0|50.0| 51.0|
+----------+----+----+----+-----+
However, the above result is incorrect. (Please compare with the above desired result.)
FIRST_VALUE
and LAST_VALUE
need a deterministic ordering to get deterministic results.
I can correct it by adding an orderBy
before grouping.
import org.apache.spark.sql.functions._
data.orderBy("time").groupBy(expr("TO_DATE(time)").as("date")).agg(first("price").as("open"), max("price").as("high"), min("price").as("low"), last("price").as("close")).show
scala>
shown as
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+
which is correct as desired !!!
My question is, is the above code 'orderBy then groupBy' valid? Is this ordering guaranteed? Can we use this non-standard feature in serious productions?
The point of this question is that, in standard SQL, we can only do a GROUP BY
then ORDER BY
to sort the aggregation, but not ORDER BY
then GROUP BY
.
The GROUP BY
will ignore the ordering of ORDER BY
.
I also wonder if Spark SQL can do such a GROUP BY
under desired ordering, can standard SQL also invent such a syntax for this?
P.S.
I can think of some aggregation functions that depend on deterministic ordering.
WITH ORDER BY time SELECT COLLECT_LIST(price) GROUP BY stockID
WITH ORDER BY time SELECT SUM(SQUARE(price - LAG(price, 1, 0))) GROUP BY stockID
Without the WITH ORDER BY time
, how can we sort the COLLECTed_LIST in standard SQL?
These examples show that "GROUP BY
under desired ordering" is still useful.