0

I have a Spark DF with the following structure:

+--------------------------------------+
| user| time                  | counts |
+--------------------------------------+
|   1 | 2018-06-04 16:00:00.0 | 5      |
|   1 | 2018-06-04 17:00:00.0 | 7      |
|   1 | 2018-06-04 17:30:00.0 | 7      |
|   1 | 2018-06-04 18:00:00.0 | 8      |
|   1 | 2018-06-04 18:30:00.0 | 10     |
|   1 | 2018-06-04 19:00:00.0 | 9      |
|   1 | 2018-06-04 20:00:00.0 | 7      |
|   2 | 2018-06-04 17:00:00.0 | 4      |
|   2 | 2018-06-04 18:00:00.0 | 4      |
|   2 | 2018-06-04 18:30:00.0 | 5      |
|   2 | 2018-06-04 19:30:00.0 | 7      |
|   3 | 2018-06-04 16:00:00.0 | 6      |
+--------------------------------------+

It was obtained from an event-log table using the following code:

ranked.groupBy($"user", sql.functions.window($"timestamp", "30 minutes"))
.agg(sum("id").as("counts"))
.withColumn("time", $"window.start")

As can be seen looking at the time column, not all 30-min intervals registered events for each user, i.e. not all user groups of frames are of equal lengths. I'd like to impute (possibly with NA's or 0's) missing time values and create a table (or RDD) like the following:

+-----------------------------------------------------------------------------+ 
| user| 2018-06-04 16:00:00 | 2018-06-04 16:30:00 | 2018-06-04 17:00:00 | ... |
+-----------------------------------------------------------------------------+
|   1 | 5                   | NA (or 0)           | 7                   | ... |
|   2 | NA (or 0)           | NA (or 0)           | 4                   | ... |
|   3 | 6                   | NA (or 0)           | NA (or 0)           | ... |
+-----------------------------------------------------------------------------+

The transpose of the table above (with a time, column, and a column for the counts of each user) would theoretically work too, but I am not sure it would be optimal spark-wise as I have almost a million different users.

How can I perform a table re-structuring like described?

alonso s
  • 152
  • 1
  • 9

1 Answers1

1

If each time window appears for at least one user, a simple pivot would do the trick (and put null for missing values). With millions of rows, it should be the case.

val reshaped_df = df.groupBy("user").pivot("time").agg(sum('counts))

In case a column is still missing, you could access the list of the columns with reshaped_df.columns and then add the missing ones. You would need to generate the list of columns that you expect (expected_columns) and then generate the missing ones as follows:

val expected_columns = ???
var result = reshaped_df
expected_columns
    .foreach{ c => 
        if(! result.columns.contains(c)) 
            result = result.withColumn(c, lit(null))
    }
Oli
  • 9,766
  • 5
  • 25
  • 46
  • How can I reasonably impute the missing timewindows? – alonso s Aug 15 '18 at 22:57
  • You could iterate over the list of columns you expect to find, and add them if they are missing. I edited the answer and added a little bit of code. I don't know what columns you expect though so you still need to generate the list ;-) – Oli Aug 16 '18 at 08:23