1

Assume this is my data:

date         value
2016-01-01   1
2016-01-02   NULL
2016-01-03   NULL
2016-01-04   2
2016-01-05   3
2016-01-06   NULL
2016-01-07   NULL
2016-01-08   NULL
2016-01-09   1

I am trying to find the start and end dates that surround the NULL-value groups. An example output would be as follows:

start        end
2016-01-01   2016-01-04
2016-01-05   2016-01-09

My first attempt at the problem produced the following:

df.filter($"value".isNull)\
    .agg(to_date(date_add(max("date"), 1)) as "max", 
         to_date(date_sub(min("date"),1)) as "min"
        )

but this only finds the total min and max value. I thought of using groupBy but don't know how to create a column for each of the null value blocks.

paisanco
  • 4,098
  • 6
  • 27
  • 33
nik
  • 2,114
  • 3
  • 22
  • 43

2 Answers2

2

The tricky part is to get the borders of the groups, therefore you need several steps.

  • first to build groups of nulls/not-nulls (using window-functions)
  • then group by blocks to get the borders within the blocks
  • then again window-function to extend the borders

Here a working example:

import ss.implicits._

val df = Seq(
  ("2016-01-01", Some(1)),
  ("2016-01-02", None),
  ("2016-01-03", None),
  ("2016-01-04", Some(2)),
  ("2016-01-05", Some(3)),
  ("2016-01-06", None),
  ("2016-01-07", None),
  ("2016-01-08", None),
  ("2016-01-09", Some(1))
).toDF("date", "value")


df
  // build blocks
  .withColumn("isnull", when($"value".isNull, true).otherwise(false))
  .withColumn("lag_isnull", lag($"isnull",1).over(Window.orderBy($"date")))
  .withColumn("change", coalesce($"isnull"=!=$"lag_isnull",lit(false)))
  .withColumn("block", sum($"change".cast("int")).over(Window.orderBy($"date")))
  // now calculate min/max within groups
  .groupBy($"block")
  .agg(
    min($"date").as("tmp_min"),
    max($"date").as("tmp_max"),
    (count($"value")===0).as("null_block")
  )
  // now extend groups to include borders
  .withColumn("min", lag($"tmp_max", 1).over(Window.orderBy($"tmp_min")))
  .withColumn("max", lead($"tmp_min", 1).over(Window.orderBy($"tmp_max")))
  // only select null-groups
  .where($"null_block")
  .select($"min", $"max")
  .orderBy($"min")
  .show()

gives

+----------+----------+
|       min|       max|
+----------+----------+
|2016-01-01|2016-01-04|
|2016-01-05|2016-01-09|
+----------+----------+
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
1

I don't have a working solution but I do have a few recommendations.

Look at using a lag; you will also have to change that code a bit to produce a lead column.

Now assume you have your lag and lead column. Your resultant dataframe will now look like this:

date         value     lag_value     lead_value
2016-01-01   1         NULL          1 
2016-01-02   NULL      NULL          1
2016-01-03   NULL      2             NULL
2016-01-04   2         3             NULL
2016-01-05   3         NULL          2
2016-01-06   NULL      NULL          3
2016-01-07   NULL      NULL          NULL
2016-01-08   NULL      1             NULL
2016-01-09   1         1             NULL

Now what you want to do is just filter by the following conditions:

min date:
df.filter("value IS NOT NULL AND lag_value IS NULL")

max date:
df.filter("value IS NULL AND lead_value IS NOT NULL")

If you want to be a bit more advanced, you can also use a when command to create a new column which states if the date is a start or end date for a null group:

date         value     lag_value     lead_value   group_date_type
2016-01-01   1         NULL          1            start
2016-01-02   NULL      NULL          1            NULL
2016-01-03   NULL      2             NULL         NULL   
2016-01-04   2         3             NULL         end
2016-01-05   3         NULL          2            start
2016-01-06   NULL      NULL          3            NULL
2016-01-07   NULL      NULL          NULL         NULL
2016-01-08   NULL      1             NULL         NULL
2016-01-09   1         1             NULL         end 

This can be created with something that looks like this:

from pyspark.sql import functions as F
df_2.withColumn('group_date_type', 
                F.when("value IS NOT NULL AND lag_value IS NULL", start)\
                  .when("value IS NULL AND lead_value IS NOT NULL", end)\
                  .otherwise(None)
                 )
J Schmidt
  • 618
  • 1
  • 6
  • 19