3

I'm working on a clustering problem in sparklyr. Many of the variables in the training set are measured on different scales and thus differ by orders of magnitude. Per best practice I am trying to scale and center the data.

There are a number of different formulas to do this, the most traditional being (X - µ) / σ where X is the random variable , µ= mean, and σ= standard deviation. I tend like to also use (X - x) / (x_max - x_min) where X= random variable, x=sample mean, x_max= maximum value, and x_min =minimum value.

I am getting a wierd result after applying this transformation using dplyr. Consider this example:

    #connect to spark
    library(sparklyr)
    library(SparkR)
    library(dplyr)
    sc = spark_connect(master = 'yarn-client',
                       spark_home = '/usr/hdp/current/spark-client',
                       app_name = 'sparklyr'
                       # config = list(
                       #   "sparklyr.shell.executor-memory" = "XG",
                       #   "sparklyr.shell.driver-memory"   = "XG",
                       #   "spark.driver.maxResultSize"     = "XG" # may need to transfer a lot of data into R 
    )

    sparkR.init()

#create a dataframe where variables in the dataset differ by an order of magnitude
mat <- as.data.frame(matrix(data = rnorm(200, mean=100,sd=10), nrow = 1000, ncol = 50))
mat1 <- as.data.frame(matrix(data = rnorm(200, mean=0,sd=1), nrow = 1000, ncol = 50))
colnames(mat1) <- paste('X',1:50,sep='')
mat.final <- cbind(mat,mat1)

#copy to Spark
dat.out <- sdf_copy_to(sc,mat.final,'dat',overwrite=TRUE)

#define centering and scaling function
scale.center <- function(x){
  (x-mean(x,na.rm=TRUE)) /(max(x,na.rm = TRUE)-min(x,na.rm = TRUE))
}

#scale data
dat.out1 <- 
  dat.out %>%
  mutate_each(funs(s=scale.center))

The code runs , but I get something strange:

str(dat.out1)

$ ops:List of 4
  ..$ name: chr "mutate"
  ..$ x   :List of 4
  .. ..$ name: chr "mutate"
  .. ..$ x   :List of 4
  .. .. ..$ name: chr "mutate"
  .. .. ..$ x   :List of 4
  .. .. .. ..$ name: chr "mutate"
  .. .. .. ..$ x   :List of 4
  .. .. .. .. ..$ name: chr "mutate"
  .. .. .. .. ..$ x   :List of 4
  .. .. .. .. .. ..$ name: chr "mutate"
  .. .. .. .. .. ..$ x   :List of 4
  .. .. .. .. .. .. ..$ name: chr "mutate"
  .. .. .. .. .. .. ..$ x   :List of 4
  .. .. .. .. .. .. .. ..$ name: chr "mutate"
  .. .. .. .. .. .. .. ..$ x   :List of 4
  .. .. .. .. .. .. .. .. ..$ name: chr "mutate"
  .. .. .. .. .. .. .. .. ..$ x   :List of 4
  .. .. .. .. .. .. .. .. .. ..$ name: chr "mutate"
  .. .. .. .. .. .. .. .. .. ..$ x   :List of 4
  .. .. .. .. .. .. .. .. .. .. ..$ name: chr "mutate"
  .. ..

The above is just a portion of the output after running str. Thoughts on what's going wrong here. I'm surprised there isn't a build in function for centering and scaling.

Community
  • 1
  • 1
schristel
  • 245
  • 1
  • 13

1 Answers1

3

Thoughts on what's going wrong here.

It is essentially the same problem as described in Sparklyr: how to center a Spark table based on column? - aggregation functions used in mutate are expanded to global (without PARTITION BY clause) window functions making this approach completely useless in practice.

I'm surprised there isn't a build in function for centering and scaling.

Well, in general Spark operates using ML Transformers, a bunch of which, has been ported to sparklyr. These can be distinguished by ft_ prefix. Unfortunately StandardScaler and MinMaxScaler are not ported yet. It is not that hard to implement your own interface though.

If you want a quick that can work on the data as-is:

library(rlang)
library(glue)

# Compute all the stats at once
stats <- dat.out %>% summarise_all(funs(avg, min, max)) %>% collect()

# Separate stats into components
cols <- dat.out %>% colnames()
avgs <- stats %>% select(ends_with("avg")) %>% unlist
mins <- stats %>% select(ends_with("min")) %>% unlist
maxs <- stats %>% select(ends_with("max")) %>% unlist

# Create expressions
exprs <- glue("({cols} - {avgs}) / ({maxs} - {mins})") %>% 
  setNames(cols) %>%
  lapply(parse_quosure)

dat.out %>% mutate(!!! exprs)

Credits once again go to Artem Sokolov (dplyr 0.7 equivalent for deprecated mutate_).

Note:

Don't use . with functions which are to be used with sparklyr. dplyr will try to match these as database functions in a "prefix" database and fail or produce unintended results.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • The only issue I'm having with this solution is that it doesn't appear to scale well. My **actual** data has 500 columns and 12 million rows. `stats <- dat.out %>% summarise_all(funs(avg, min, max)) %>% collect()` has already been running for over 5 minutes. – schristel Dec 14 '17 at 14:10
  • By 5 minutes I mean 1 hour :/ – schristel Dec 14 '17 at 15:06
  • The data is in a hive table. My configuration for connecting to Spark R is `sc = spark_connect(master = 'yarn-client', spark_home = '/usr/hdp/current/spark-client', app_name = 'sparklyr', config = list( "sparklyr.shell.executor-memory" = "24G", "sparklyr.shell.driver-memory" = "16G", "spark.driver.maxResultSize" = "16G") ) sparkR.init()` I'm reading the data from hive using the dplyr function `tbl` – schristel Dec 14 '17 at 16:00
  • And I'm actually only reading in 1/12 of the table, that is, 1 million rows instead of 12 million – schristel Dec 14 '17 at 16:04