1

I am trying to refactor my R code (shown below) into Sparklyr R code to work on a spark dataset to get to the final result as shown in Table 1:

Using help from stack overflow post Gather in sparklyr and SparklyR separate one Spark Data Frame column into two columns I was able to reach all the way except last step dealing with Spread.

Need Help:

  1. Implement Spread via SparklyR
  2. Optimize code in any way

Table 1: Final output needed:

  var              n nmiss
1 Sepal.Length   150     0
2 Sepal.Width    150     0

R code to achieve it:

library(dplyr)
library(tidyr)
library(tibble)

data <- iris
data_tbl <- as_tibble(data)

profile <- data_tbl %>%
  select(Sepal.Length,Sepal.Width) %>%
  summarize_all(funs(
    n = n(), #Count
    nmiss=sum(as.numeric(is.na(.))) # MissingCount
   )) %>%
  gather(variable, value) %>%
  separate(variable, c("var", "stat"), sep = "_(?=[^_]*$)") %>% 
  spread(stat, value) 

Spark Code:

sdf_gather <- function(tbl){
  all_cols <- colnames(tbl)
  lapply(all_cols, function(col_nm){
    tbl %>% 
      select(col_nm) %>% 
      mutate(key = col_nm) %>%
      rename(value = col_nm)  
  }) %>% 
    sdf_bind_rows() %>% 
    select(c('key', 'value'))
}


profile <- data_tbl %>%
  select(Sepal.Length,Sepal.Width ) %>%
  summarize_all(funs(
    n = n(), 
    nmiss=sum(as.numeric(is.na(.))) 
  )) %>%
  sdf_gather(.) %>%
  ft_regex_tokenizer(input_col="key", output_col="KeySplit", pattern="_(?=[^_]*$)") %>% 
  sdf_separate_column("KeySplit", into=c("var", "stat")) %>%
  select(var,stat,value) %>%
  sdf_register('profile')
zero323
  • 322,348
  • 103
  • 959
  • 935
amitkb3
  • 303
  • 4
  • 14

1 Answers1

2

In this specific case (in general where all columns have the same type, although if you're interested only in missing data statistics, this can be further relaxed) you can use much simpler structure than this.

With data defined like this:

df <- copy_to(sc, iris, overwrite = TRUE) 

gather the columns (below I assume a function as defined in my answer to Gather in sparklyr)

long <- df %>% 
  select(Sepal_Length, Sepal_Width) %>% 
  sdf_gather("key", "value", "Sepal_Length", "Sepal_Width")

and then group and aggregate:

long %>% 
  group_by(key) %>% 
  summarise(n = n(), nmiss = sum(as.numeric(is.na(value)), na.rm=TRUE))

with result as:

# Source: spark<?> [?? x 3]
  key              n nmiss
  <chr>        <dbl> <dbl>
1 Sepal_Length   150     0
2 Sepal_Width    150     0

Given reduced size of the output it is also fine to collect the result after aggregation

agg <- df %>%
  select(Sepal_Length,Sepal_Width) %>%
    summarize_all(funs(
      n = n(),
      nmiss=sum(as.numeric(is.na(.))) # MissingCount
   )) %>% collect()

and apply your gather - spread logic on the result:

agg %>% 
  tidyr::gather(variable, value) %>%
  tidyr::separate(variable, c("var", "stat"), sep = "_(?=[^_]*$)") %>% 
  tidyr::spread(stat, value) 
# A tibble: 2 x 3
  var              n nmiss
  <chr>        <dbl> <dbl>
1 Sepal_Length   150     0
2 Sepal_Width    150     0

In fact the latter approach should be superior performance-wise in this particular case.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • There is no scalabilty issue with structure like this one - since you perform global aggregation you have ncols * nfunctions values and only this result - which is tiny - is collected. If you increase number of columns or functions, you'll encounter serious planner issues, before `collect` becomes a problem. In fact wide intermediate result is much more concerning here, though aggregating ~3500 columns is still feasible. – zero323 Jan 24 '19 at 22:37