2

I'm trying to find missing minutes in my time-series-dataset. I wrote an R code for a local performance on a small sample:

test <- dfv %>% mutate(timestamp = as.POSIXct(DaySecFrom.UTC.)) %>% 
complete(timestamp = seq.POSIXt(min(timestamp), max(timestamp), by = 'min'), ElemUID)

But you can't use complete() from tidyr on a spark_tbl.

Error in UseMethod("complete_") : 
  no applicable method for 'complete_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"

here is some test-data:

ElemUID ElemName    Kind    Number  DaySecFrom(UTC) DaySecTo(UTC)
399126817   A648/13FKO-66   DEZ     2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
483492732   A661/18FRS-97   DEZ   120.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
399126819   A648/12FKO-2    DEZ    60.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
399126818   A648/12FKO-1    DEZ   180.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
399126816   A648/13FKO-65   DEZ     2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
398331142   A661/31OFN-1    DEZ   120.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
398331143   A661/31OFN-2    DEZ     2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
483492739   A5/28FKN-65 DEZ     2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
483492735   A661/23FRS-97   DEZ    60.00    2017-07-01 23:58:00.000 2017-07-01 23:59:00.000

Is there any other way or work-around to solve this task on a spark-cluster in R? I would be really happy for your help!

zero323
  • 322,348
  • 103
  • 959
  • 935
user60856839
  • 133
  • 11

1 Answers1

2

Find a min and max values as epoch time:

df <- copy_to(sc, tibble(id=1:4, timestamp=c(
    "2017-07-01 23:49:00.000", "2017-07-01 23:50:00.000",
    # 6 minutes gap
    "2017-07-01 23:56:00.000",
    # 1 minute gap
    "2017-07-01 23:58:00.000")
), "df", overwrite=TRUE)

min_max <- df %>% 
  summarise(min(unix_timestamp(timestamp)), max(unix_timestamp(timestamp))) %>% 
  collect() %>% 
  unlist()

Generate a reference range from min(epoch_time) to max(epoch_time) + interval:

library(glue) 

query <- glue("SELECT id AS timestamp FROM RANGE({min_max[1]}, {min_max[2] + 60}, 60)") %>%
  as.character()

ref <- spark_session(sc) %>% invoke("sql", query) %>% 
  sdf_register() %>%
  mutate(timestamp = from_unixtime(timestamp, "yyyy-MM-dd HH:mm:ss.SSS"))

Outer join both:

ref %>% left_join(df, by="timestamp")
# Source:   lazy query [?? x 2]
# Database: spark_connection
   timesptamp                 id
   <chr>                   <int>
 1 2017-07-01 23:49:00.000     1
 2 2017-07-01 23:50:00.000     2
 3 2017-07-01 23:51:00.000    NA
 4 2017-07-01 23:52:00.000    NA
 5 2017-07-01 23:53:00.000    NA
 6 2017-07-01 23:54:00.000    NA
 7 2017-07-01 23:55:00.000    NA
 8 2017-07-01 23:56:00.000     3
 9 2017-07-01 23:57:00.000    NA
10 2017-07-01 23:58:00.000     4
# ... with more rows

Note:

If you experience issues related to SPARK-20145 you can replace SQL query with:

spark_session(sc) %>%
  invoke("range", as.integer(min_max[1]), as.integer(min_max[2]), 60L) %>% 
  sdf_register()
zero323
  • 322,348
  • 103
  • 959
  • 935
  • How to use `invoke()` functionality + tbl(sc, df) in this example? I already cached the data with tbl_cache(). My data is located in a Hadoop-Cluster/Hive external Table. Until defining ref-df it works very fine. thanks for that! – user60856839 Apr 17 '18 at 19:34
  • It is the same logic. How you load the data doesn't matter. `invoke` part doesn't really use input data. – zero323 Apr 17 '18 at 19:55
  • Is it possible to check to wich elemuid the missing timestamp refers? Like: at elemuid "399126817", timestamp "2017-9-19 23:50:00" is missing. maybe with some `group_by()` command before `invoke`? – user60856839 Apr 18 '18 at 11:50