1

I try to find missing timestamp. Here are a lot of solutions to fix this single problem. Nevertheless I also want to find "where" timestamp by ID is missing.

So for example the test-dataset would look like this:

elemuid timestamp
1232    2018-02-10 23:00:00
1232    2018-02-10 23:01:00
1232    2018-02-10 22:58:00
1674    2018-02-10 22:40:00
1674    2018-02-10 22:39:00
1674    2018-02-10 22:37:00
1674    2018-02-10 22:35:00

And the solution should be like:

elemuid timestamp
1232    2018-02-10 22:59:00
1674    2018-02-10 22:38:00
1674    2018-02-10 22:36:00

My problem is that I can only use dplyr, because I would like to use this code also in sparklyr. I would be really happy for your help!

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
user60856839
  • 133
  • 11

2 Answers2

2

Here is one option with anti_join. Assuming that the 'timestamp' column is not a Datetime object, we convert it to POSIXct

library(tidyverse)
df1 <- df1 %>%
          mutate(timestamp = ymd_hms(timestamp)) 

grouped by 'elemuid', use complete to expand the 'timestamp' by 1 minute and do an anti_join with the original dataset

df1 %>%
    group_by(elemuid) %>% 
    complete(timestamp = seq(min(timestamp), max(timestamp), by = "1 min")) %>% 
    anti_join(df1)
# A tibble: 3 x 2
# Groups: elemuid [?]
#   elemuid timestamp          
#     <int> <dttm>             
#1    1232 2018-02-10 22:59:00
#2    1674 2018-02-10 22:36:00
#3    1674 2018-02-10 22:38:00
akrun
  • 874,273
  • 37
  • 540
  • 662
  • that's a straightforward and intuitive answer. As I mentioned my problem is that I can only use `dplyr`. `ymd_hms` and `complete` are not supported functions by sparklyr. I asked a similar question a few days ago: [Here](https://stackoverflow.com/questions/49871925/complete-time-series-with-sparklyr). Now I don't know how to change the answer to full-fill my task with ID. – user60856839 Apr 21 '18 at 11:54
  • @luks_wi Would `base R` functions like `expand.grid` and `as.POSIXct` work? i.e. `df1 <- df1 %>% mutate(timestamp = as.POSIXct(timestamp))` – akrun Apr 21 '18 at 11:55
  • @luks_wi I thought these functions would work because they are part of the `tidyverse` – akrun Apr 21 '18 at 11:56
  • unfortunately you can't use them within a spark-dataframe.. :/ I would have to copy it to an R-dataframe, but then I am losing the speed of spark. The exception is always: _Error: org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database 'as' not found;_ – user60856839 Apr 21 '18 at 12:01
  • what error do you get? do you run it in a spark-context? – user60856839 Apr 21 '18 at 12:28
  • Yes, i was running in that spark-context. The error is `error: org.apache.spark.sql.AnalysisException: could not resolve `RANGE` to a table-valued function; lin` – akrun Apr 21 '18 at 12:30
  • That's interesting, because it works for me. Maybe you didn't install `glue`-package. That's all I can suggest and imagine – user60856839 Apr 21 '18 at 12:37
  • I have all the packages installed' – akrun Apr 21 '18 at 12:39
  • 1
    @akrun That's weird. How about `spark_session(sc) %>% invoke("range", as.integer(min_max[1]), as.integer(min_max[2])) %>% sdf_register()` where `sc` is `spark_connection` object? – zero323 Apr 21 '18 at 21:41
  • 1
    Sounds like [SPARK-20145](https://issues.apache.org/jira/browse/SPARK-20145). Regarding the question - to make it work in Spark you'll need cross product with distinct IDs, but you have to make sure you'll do it with SQL, not a dplyr way with dummy, otherwise you'll end up with a single partition, so it is probably easier to write it in SQL all the way. – zero323 Apr 22 '18 at 14:35
  • Oh, and enable [cross joins](https://stackoverflow.com/a/39000050/6910411). – zero323 Apr 22 '18 at 14:41
  • @user6910411 Please post it as a solution – akrun Apr 22 '18 at 15:44
  • @user6910411 where do I have to write the command `df1.crossJoin(df2)`. What does it mean? And how to work with different start-/end-timestamps by group like: id: 10, timestamp: "2018-06-06 10:21:00", id: 5, timestamp: "2018-06-06 10:21:01". So do I have to create a _reference-dataset_ for all groups? – user60856839 Apr 24 '18 at 12:18
2

For the simplicity let's assume you've already followed the instructions from your previous question, and computed minimum and maximum (min_max) Epoch time in seconds.

The remaining steps are quite similar to the ones we followed before:

  • Generate range of values:

    epoch_range <- spark_session(sc) %>% 
      invoke("range", as.integer(min_max[1]), as.integer(min_max[2]), 60L) %>%
      invoke("withColumnRenamed", "id", "timestamp")
    
  • Compute distinct elemuid

    elemuids <- df %>% select(elemuid) %>% distinct() %>% spark_dataframe()
    

Now, we want to generate a reference table as a Cartesian product of the range and unique ids:

ref <- epoch_range %>% 
  invoke("crossJoin", elemuids) %>% 
  sdf_register() %>%
  mutate(timestamp = from_unixtime(timestamp, "yyyy-MM-dd HH:mm:ss.SSS"))

You might be tempted to use more dplyr-ish method:

sdf_register(epoch_range) %>% mutate(dummy = 1) %>% 
  left_join(sdf_register(elemuids) %>% mutate(dummy = 1), by = "dummy") %>%
  select(-dummy)

This would work fine if size of the product is small (in that case Spark should use broadcast join), but will cause complete data skew otherwise so it is not safe to use in general.

Finally we'll outer join data as before:

ref %>% left_join(df, by = c("timestamp", "elemuid"))

to fill out things, or (as already explained in the answer provided by akrun) anti join to remove missing points:

ref %>% anti_join(df, by = c("timestamp", "elemuid"))
user60856839
  • 133
  • 11
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Ahhh..perfect! That's all I've been looking for. Meanwhile I tried my luck with `spark_apply()` to extend _sparklyr_-functionality, but unfortunately it did not work so well. All I got was an _error with status: 255_. Nevertheless I would like to thank you very much for your help! – user60856839 Apr 24 '18 at 15:02
  • why is this method not working with millions of rows? the ref-dataset looks good, but count of dates per ID is the same like after join..? – user60856839 Apr 25 '18 at 13:40
  • There might be no missing values. In general try to reduce problem to MCVE. For example does it happen when you take a small sample? – zero323 Apr 25 '18 at 13:41
  • but if _anti-join_-table is as large as ref-table it means that all dates are missing and that's impossible.. – user60856839 Apr 25 '18 at 13:59
  • Huh... Are you sure that types match? I mean there is no weird implicit conversion on the way. – zero323 Apr 25 '18 at 14:03
  • And how does the execution plan look like? (you can use `optimizedPlan` function from [here](https://stackoverflow.com/a/43614889/6910411)). – zero323 Apr 25 '18 at 14:08
  • ah, I found an error. so if one ID with a timestamp is already used, then he does not check whether the timestamp is also present with the other ID. I am looking for a solution – user60856839 Apr 25 '18 at 14:13
  • the result from the above dataset is always pairwise missing dates by IDs. So there should be also e.g. `1232 2018-02-10 22:40:00`, but it is not, because the date is already at the other ID. – user60856839 Apr 25 '18 at 14:23
  • It worked, but only when i do a collect() on ref-dataframe and then copy it back to spark-context. – user60856839 Apr 25 '18 at 18:35
  • That's unexpected. Do you get any warnings in the log? – zero323 Apr 25 '18 at 19:25
  • no, nothing at all. It seemed to work, but only in local spark context. At a yarn-client it is just the same as before. Do I have to disable `crossJoin` afterwards? I can't imagine what the problem could be, because _ref-dataset_ looks just perfect, but the result after `anti_join` is just the same as _ref_ and that's not possible.(I checked som dates by hand..) – user60856839 Apr 26 '18 at 09:32
  • No, actually you don't even have to enable it as we use `crossJoin` explicitly (before I thought about SQL version like `SELECT * FROM epoch_range JOIN elemuids`, then it would be required). TBH it sounds like a bug. We could try rewriting this by hand with outer join, but since I cannot reproduce the problem, I cannot say it make any difference. And if you can get MCVE yourself, and confirm it with supported API, then I'd recommend opening a JIRA ticket (I don't think it is introduced `sparklyr`) – zero323 Apr 26 '18 at 09:46
  • That was not the answer I was hoping for, but that's the way it is. Thanks anyway :) – user60856839 Apr 26 '18 at 09:53
  • 1
    I'll experiment with this when I have a spare moment, and see if I can reproduce this thing somehow. Which version of Spark do you use? – zero323 Apr 26 '18 at 09:57
  • That would be awesome! spark 2.2.0. – user60856839 Apr 26 '18 at 09:59
  • Huh, could you try updating to the latest 2.2 release (2.2.1)? – zero323 Apr 26 '18 at 10:01
  • Yes, I can use 2.2.1, because it is supported by dataProc. But as far as I can tell it has not changed result. – user60856839 Apr 26 '18 at 10:11
  • Your method works excellent..I was dumb and used the wrong date-format without milliseconds. – user60856839 May 02 '18 at 09:34