0

I have a data in below format :

+---------------------+----+----+---------+----------+
|      date_time      | id | cm | p_count |   bcm    |
+---------------------+----+----+---------+----------+
| 2018-02-01 04:38:00 | v1 | c1 |       1 |  null    |
| 2018-02-01 05:37:07 | v1 | c1 |       1 |  null    |
| 2018-02-01 11:19:38 | v1 | c1 |       1 |  null    |
| 2018-02-01 12:09:19 | v1 | c1 |       1 |  c1      |
| 2018-02-01 14:05:10 | v2 | c2 |       1 |  c2      |
+---------------------+----+----+---------+----------+

I need to find rolling sum of p_count column between two date_time and partition by id.

logic for start_date_time and end_date_time for rolling sum window is below :

start_date_time=min(date_time) group by (id,cm)

end_date_time= bcm == cm ? date_time : null

in this case start_date_time=2018-02-01 04:38:00 and end_date_time=2018-02-01 12:09:19 .

Output should look like :

+---------------------+----+----+---------+----------+-------------+
|      date_time      | id | cm | p_count |   bcm    | p_sum_count |
+---------------------+----+----+---------+----------+-------------+
| 2018-02-01 04:38:00 | v1 | c1 |       1 |  null    |1            |
| 2018-02-01 05:37:07 | v1 | c1 |       1 |  null    |2            |
| 2018-02-01 11:19:38 | v1 | c1 |       1 |  null    |3            |
| 2018-02-01 12:09:19 | v1 | c1 |       1 |  c1      |4            |
| 2018-02-01 14:05:10 | v2 | c2 |       1 |  c2      |1            |
+---------------------+----+----+---------+----------+-------------+
Shrashti
  • 140
  • 5
  • 13

1 Answers1

0
var input = sqlContext.createDataFrame(Seq(
            ("2018-02-01 04:38:00", "v1", "c1",1,null),
            ("2018-02-01 05:37:07", "v1", "c1",1,null),
            ("2018-02-01 11:19:38", "v1", "c1",1,null),
            ("2018-02-01 12:09:19", "v1", "c1",1,"c1"),
            ("2018-02-01 14:05:10", "v2", "c2",1,"c2")
            )).toDF("date_time","id","cm","p_count" ,"bcm")

    input.show()

Results:

+---------------------+----+----+---------+----------+-------------+
|      date_time      | id | cm | p_count |   bcm    | p_sum_count |
+---------------------+----+----+---------+----------+-------------+
| 2018-02-01 04:38:00 | v1 | c1 |       1 |  null    |1            |
| 2018-02-01 05:37:07 | v1 | c1 |       1 |  null    |2            |
| 2018-02-01 11:19:38 | v1 | c1 |       1 |  null    |3            |
| 2018-02-01 12:09:19 | v1 | c1 |       1 |  c1      |4            |
| 2018-02-01 14:05:10 | v2 | c2 |       1 |  c2      |1            |
+---------------------+----+----+---------+----------+-------------+

Next Code:

        input.createOrReplaceTempView("input_Table");
        import org.apache.spark.sql.expressions.Window
        import org.apache.spark.sql.functions._

        //val results = spark.sqlContext.sql("SELECT sum(p_count) from input_Table tbl GROUP BY tbl.cm")

        val results = sqlContext.sql("select *, " +
          "SUM(p_count) over ( order by id  rows between unbounded preceding and current row ) cumulative_Sum " +
          "from input_Table ").show

Results:

+-------------------+---+---+-------+----+--------------+
|          date_time| id| cm|p_count| bcm|cumulative_Sum|
+-------------------+---+---+-------+----+--------------+
|2018-02-01 04:38:00| v1| c1|      1|null|             1|
|2018-02-01 05:37:07| v1| c1|      1|null|             2|
|2018-02-01 11:19:38| v1| c1|      1|null|             3|
|2018-02-01 12:09:19| v1| c1|      1|  c1|             4|
|2018-02-01 14:05:10| v2| c2|      1|  c2|             5|
+-------------------+---+---+-------+----+--------------+

You need to group by while windowing and add your logic to get expected reslts

ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

Logically a Windowed Aggregate Function is newly calculated for each row within the PARTITION based on all ROWS between a starting row and an ending row.

Starting and ending rows might be fixed or relative to the current row based on the following keywords:

  • UNBOUNDED PRECEDING, all rows before the current row -> fixed
  • UNBOUNDED FOLLOWING, all rows after the current row -> fixed
  • x PRECEDING, x rows before the current row -> relative
  • y FOLLOWING, y rows after the current row -> relative

Possible kinds of calculation include:

Both starting and ending row are fixed, the window consists of all rows of a partition, e.g. a Group Sum, i.e. aggregate plus detail rows

One end is fixed, the other relative to current row, the number of rows increases or decreases, e.g. a Running Total, Remaining Sum

Starting and ending row are relative to current row, the number of rows within a window is fixed, e.g. a Moving Average over n rows

So SUM(x) OVER (ORDER BY col ROWS UNBOUNDED PRECEDING) results in a Cumulative Sum or Running Total

11 -> 11
 2 -> 11 +  2                = 13
 3 -> 13 +  3 (or 11+2+3)    = 16
44 -> 16 + 44 (or 11+2+3+44) = 60

What is ROWS UNBOUNDED PRECEDING used for in Teradata?

vaquar khan
  • 10,864
  • 5
  • 72
  • 96
  • Hey Vaquar thanks for the reply and explanation but this is not what I wanted. I needed to apply date range conditions and partition by visitor id with that. in your solution simple rolling sum has been done. – Shrashti Jul 02 '18 at 11:02