12

(Tested on Spark 2.2 and 2.3)

I am using Spark to aggregate stock trading ticks into daily OHLC (open-high-low-close) records.

The input data is like

val data = Seq(("2018-07-11 09:01:00", 34.0), ("2018-07-11 09:04:00", 32.0), ("2018-07-11 09:02:00", 35.0), ("2018-07-11 09:03:00", 30.0), ("2018-07-11 09:00:00", 33.0), ("2018-07-12 09:01:00", 56.0), ("2018-07-12 09:04:00", 54.0), ("2018-07-12 09:02:00", 51.0), ("2018-07-12 09:03:00", 50.0), ("2018-07-12 09:00:00", 51.0)).toDF("time", "price")

data.createOrReplaceTempView("ticks")

data.show

scala>

shown as

+-------------------+-----+
|               time|price|
+-------------------+-----+
|2018-07-11 09:01:00| 34.0|
|2018-07-11 09:04:00| 32.0|
|2018-07-11 09:02:00| 35.0|
|2018-07-11 09:03:00| 30.0|
|2018-07-11 09:00:00| 33.0|
|2018-07-12 09:01:00| 56.0|
|2018-07-12 09:04:00| 54.0|
|2018-07-12 09:02:00| 51.0|
|2018-07-12 09:03:00| 50.0|
|2018-07-12 09:00:00| 51.0|
+-------------------+-----+

Desired output is

+----------+----+----+----+-----+
|      date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+

There have been many SQL solutions such as this and this.

SELECT
    TO_DATE(time) AS date,
    FIRST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS open,
    MAX(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS high,
    MIN(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS low,
    LAST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS close
FROM ticks

Due to the limitation of SQL, these solutions are cumbersome.

Today, I found Spark SQL can use FIRST_VALUE and LAST_VALUE in a GROUP BY context, which is not allowed in standard SQL.

This unlimitation of Spark SQL derives a neat and tidy solution like this:

SELECT
    TO_DATE(time) AS date,
    FIRST_VALUE(price) AS open,
    MAX(price) AS high,
    MIN(price) AS low,
    LAST_VALUE(price) AS close
FROM ticks
GROUP BY TO_DATE(time)

You can try it

spark.sql("SELECT TO_DATE(time) AS date, FIRST(price) AS open, MAX(price) AS high, MIN(price) AS low, LAST(price) AS close FROM ticks GROUP BY TO_DATE(time)").show

scala>

shown as

+----------+----+----+----+-----+
|      date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|34.0|35.0|30.0| 33.0|
|2018-07-12|56.0|56.0|50.0| 51.0|
+----------+----+----+----+-----+

However, the above result is incorrect. (Please compare with the above desired result.)

FIRST_VALUE and LAST_VALUE need a deterministic ordering to get deterministic results.

I can correct it by adding an orderBy before grouping.

import org.apache.spark.sql.functions._

data.orderBy("time").groupBy(expr("TO_DATE(time)").as("date")).agg(first("price").as("open"), max("price").as("high"), min("price").as("low"), last("price").as("close")).show

scala>

shown as

+----------+----+----+----+-----+
|      date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+

which is correct as desired !!!

My question is, is the above code 'orderBy then groupBy' valid? Is this ordering guaranteed? Can we use this non-standard feature in serious productions?

The point of this question is that, in standard SQL, we can only do a GROUP BY then ORDER BY to sort the aggregation, but not ORDER BY then GROUP BY. The GROUP BY will ignore the ordering of ORDER BY.

I also wonder if Spark SQL can do such a GROUP BY under desired ordering, can standard SQL also invent such a syntax for this?

P.S.

I can think of some aggregation functions that depend on deterministic ordering.

WITH ORDER BY time SELECT COLLECT_LIST(price) GROUP BY stockID

WITH ORDER BY time SELECT SUM(SQUARE(price - LAG(price, 1, 0))) GROUP BY stockID

Without the WITH ORDER BY time, how can we sort the COLLECTed_LIST in standard SQL?

These examples show that "GROUP BY under desired ordering" is still useful.

John Lin
  • 1,130
  • 10
  • 17
  • I wonder what's the point of providing `first_value, last_value` if we can use it like this, what's the intended use case if not this? :( – eugene Jan 05 '20 at 14:04
  • If some query result is ordered, and its grouping is order-preserving, then we can have first_value, last_value, min, max, sum, count ... in one pass. Don't need multiple partition-by statements. I believe spark's orderby+groupby is order-preserving because I got the correct result in the above example. – John Lin Jun 06 '21 at 07:10
  • As for use cases, I have shown 2: `COLLECT_LIST` and `price - LAG(price)` in the P.S. section of my question. The real intended use case was REALLY computing OHLC for stock ticks. – John Lin Jun 06 '21 at 08:04

1 Answers1

4

Ordering in groupBy/agg not guaranted, you can use window function with partition by key and ordering by time

K. Kostikov
  • 71
  • 1
  • 5
  • I have not chosen your answer because I can't tell whether what you said "not guaranteed" is true or not. Maybe it is guaranteed but not documented? If you provide more evidence such as documentation or source code study, I will choose your answer. Thank you. – John Lin Jun 06 '21 at 06:54
  • What you provide about window functions was one of the known solutions in the hyperlinks in my question. My question is based on this. To make it clarified, I just now edited my question to add the windowing SQL statement. – John Lin Jun 06 '21 at 07:47