5

Consider this simple example that uses sparklyr:

library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

# Source:   table<mytext_spark> [?? x 3]
# Database: spark_connection
   text                                                                    book                label
   <chr>                                                                   <chr>               <int>
 1 SENSE AND SENSIBILITY                                                   Sense & Sensibility     0
 2 ""                                                                      Sense & Sensibility     0
 3 by Jane Austen                                                          Sense & Sensibility     0
 4 ""                                                                      Sense & Sensibility     0
 5 (1811)                                                                  Sense & Sensibility     0
 6 ""                                                                      Sense & Sensibility     0
 7 ""                                                                      Sense & Sensibility     0
 8 ""                                                                      Sense & Sensibility     0
 9 ""                                                                      Sense & Sensibility     0
10 CHAPTER 1                                                               Sense & Sensibility     0
11 ""                                                                      Sense & Sensibility     0
12 ""                                                                      Sense & Sensibility     0
13 The family of Dashwood had long been settled in Sussex.  Their estate   Sense & Sensibility     0
14 was large, and their residence was at Norland Park, in the centre of    Sense & Sensibility     0
15 their property, where, for many generations, they had lived in so       Sense & Sensibility     0
16 respectable a manner as to engage the general good opinion of their     Sense & Sensibility     0

The dataframe is reasonably tiny in size (about 70k rows and 14k unique words).

Now, training a naive bayes model only takes a few seconds on my cluster. First, I define the pipeline

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

then training the naive bayes model

> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
                                    expr      min       lq     mean   median       uq      max neval
 model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832     3

Now the problem is that trying to run any tree-based model (random forest, boosted trees, etc) on the same (actually tiny!!) dataset will not work.

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240,
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 1580, 1.1.1.1.1, executor 5): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

I think this is due to the sparseness of the matrix representation of the tokens, but is there anything that can be done here? Is this a sparklyr problem? A spark problem? Is my code non-efficient?

Thanks!

Community
  • 1
  • 1
ℕʘʘḆḽḘ
  • 18,566
  • 34
  • 128
  • 235

2 Answers2

5

You are getting this error because you are actually hitting the famous 2G limit that we have in Spark https://issues.apache.org/jira/browse/SPARK-6235

The solution is to repartition your data before feeding it to the algorithm.

This is actually two gotchas in this post :

  • Working with local data.
  • Tree based models in Spark are memory hungry.

So, let’s review your code which seems harmless;

 library(janeaustenr) # to get some text data
 library(stringr)

 mytext <- austen_books() %>% 
    mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

 mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

So what does the last line do ?

copy_to (not designed for big data sets), actually just copies the local R data frame to a 1 partition Spark DataFrame

So you’ll just need to repartition your data to make sure that once the pipeline prepares your data before feeding into gbt, the partition size is smaller than 2GB.

So you can just do the following to repartition your data :

# 20 is an arbitrary number I chose to test and it seems to work well in this case, 
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <- 
 copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% 
 sdf_repartition(partitions = 20)

PS1: max_memory_in_mb is the amount of memory you are giving for gbt to computes it's statistics. It's not related directly to the amount of data as input.

PS2: If you didn't set up enough memory to your executors, you might run into a java.lang.OutOfMemoryError : GC overhead limit exceeded

EDIT: What's the meaning of repartitioning data ?

We can always refer to the definition of what a partition is before talking about repartitioning. I'll try to be short.

A partition is a logical chunk of a large distributed data set.

Spark manages data using partitions that helps parallelize distributed data processing with minimal network traffic for sending data between executors. By default, Spark tries to read data into an RDD from the nodes that are close to it. Since Spark usually accesses distributed partitioned data, to optimize transformation operations it creates partitions to hold the data chunks.

Increasing partitions count will make each partition to have less data (or not at all!)

source: excerpt from @JacekLaskowski Mastering Apache Spark book.

But data partitions isn't always right, like in this case. So repartition is needed. (sdf_repartition for sparklyr)

sdf_repartition will scatter and shuffle your data across your nodes. i.e sdf_repartition(20) will create of 20 partitions of your data instead of the 1 you originally have in this case.

I hope this helps.

The whole code :

library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)

library(janeaustenr) # to get some text data
library(stringr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240, # this is amount of data that can be use for 
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)

pipeline3 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') 

model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7> 
#   Stages 
# |--1 RegexTokenizer (Transformer)
# |    <regex_tokenizer_1ce4342b543b> 
# |     (Parameters -- Column Names)
# |      input_col: text
# |      output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# |    <count_vectorizer_1ce4e0e6489> 
# |     (Parameters -- Column Names)
# |      input_col: mytoken
# |      output_col: finaltoken
# |     (Transformer Info)
# |      vocabulary: <list> 
# |--3 GBTClassificationModel (Transformer)
# |    <gbt_classifier_1ce41ab30213> 
# |     (Parameters -- Column Names)
# |      features_col: finaltoken
# |      label_col: label
# |      prediction_col: pcol
# |      probability_col: prcol
# |      raw_prediction_col: rpcol
# |     (Transformer Info)
# |      feature_importances:  num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ... 
# |      num_classes:  int 2 
# |      num_features:  int 39158 
# |      total_num_nodes:  int 540 
# |      tree_weights:  num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ... 
# |      trees: <list> 
eliasah
  • 39,588
  • 11
  • 124
  • 154
  • Data has less than 6MB on disk and 4.1 MB in memory. That's really far away from 2GB limits (even correcting for Spark's inefficiency) Just saying... :) – zero323 Jun 08 '18 at 10:59
  • I agree but for some reason. 250k vocab in a 1 partition hits the 2005mb according to the logs... – eliasah Jun 08 '18 at 11:04
  • I don't think this is really the core problem. Take a look at the feature extraction pipeline and think how the decision tree will be derived. – zero323 Jun 08 '18 at 11:06
  • thanks for helping! @user6910411 do you have an idea here? My original dataset is actually read from `spark_read_parquet` and is much bigger. When I try to run a similar code above the code just crashes with another error, but I think as you suggest there might be a common issue here – ℕʘʘḆḽḘ Jun 08 '18 at 11:08
  • 1
    @ℕʘʘḆḽḘ Maybe, but it is a longish answer, and not exactly one for SO. With minor tweaks I could fit the model on a single virtual core / 1GB memory, albeit it took forever (~8 hours or so, not exactly satisfying with 5MB of data). If no one provides a fully satisfying answer, I'll try to revisit this later this week, and provide some suggestions. – zero323 Jun 08 '18 at 11:26
  • could you please tell me which tweaks you used? I can try on my side meanwhile!! – ℕʘʘḆḽḘ Jun 08 '18 at 11:30
  • @eliasah I can try the `sdf_repartition` thing, but I am always confused as to how many partitions to choose. Say I have `n` nodes, each with `c` cpus and total ram per node `R`, and my dataset is of size `D` the question is how many repartitions should I choose? `n`, the number of nodes? – ℕʘʘḆḽḘ Jun 08 '18 at 11:31
  • For some reason, the repartition worked for me and I was able to fit the model after repartitioning the data. I still believe that block (in 1 partition) was bigger than the allowed 2G @ℕʘʘḆḽḘ – eliasah Jun 08 '18 at 11:48
  • @ℕʘʘḆḽḘ there is no secret recipe, I advise you reading the following https://stackoverflow.com/questions/35800795/number-of-partitions-in-rdd-and-performance-in-spark and https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/sparksqlshufflepartitions_draft.html – eliasah Jun 08 '18 at 11:50
  • eliasah you might be right. what is this 2G error exactly? each node (or executor) cannot have a bigger data than 2GB? Also, what is the meaning of setting repartition to 10? it creates 10 tasks? or split the data in 10 buckets? thanks again! – ℕʘʘḆḽḘ Jun 09 '18 at 03:51
  • @ℕʘʘḆḽḘ That's the block size limit. Basically, the block abstraction in Spark is a ByteBuffer, which unfortunately has a limit of Integer.MAX_VALUE (~2GB). A ByteBuffer is a Java object used conceptionally in IO interactions. – eliasah Jun 09 '18 at 07:39
  • What seem to be happening here is that those 5mb as input becomes gigabytes in shuffle... I ignore the reason so far. I didn't dig deeper – eliasah Jun 09 '18 at 07:42
  • thanks @eliasah @user6910411 ! `sdf_repartition` seems tand o be the right solution but when I tried with my big dataset. while everything seemed to work fine overnight, this morning I discover `Error: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout`... Have you ever seen that? – ℕʘʘḆḽḘ Jun 09 '18 at 11:38
  • as a matter of fact, I set `sdf_repartition` to `4000` – ℕʘʘḆḽḘ Jun 09 '18 at 11:43
  • @eliasah haha no with my real big dataset with about 40million obs – ℕʘʘḆḽḘ Jun 09 '18 at 15:11
  • 1
    @ℕʘʘḆḽḘ What about your cluster setup ? (memory, resources manager, etc.) – eliasah Jun 11 '18 at 13:32
  • @eliasah is there a way to actually export that information easily? – ℕʘʘḆḽḘ Jun 11 '18 at 13:32
  • @ℕʘʘḆḽḘ I don't think there is something out of the box. – eliasah Jun 11 '18 at 13:34
  • OK, in any case I think this is a great answer. I am going to accept it :) Just a follow up, could you please expand a bit about the meaning of repartitioning the data? what it means to use, say , `sdf_repartition` to 10? – ℕʘʘḆḽḘ Jun 11 '18 at 13:37
  • cc. @JacekLaskowski ;) – eliasah Jun 12 '18 at 09:02
0

Can you please provide the full error traceback?

My guess is that you're running out of memory. Random forest and gbt trees are ensemble models, so they require more memory and computational power than naive bayes.

Try repartitioning the data (spark.sparkContext.defaultParallelism value is a good place to start) so that each of your workers gets a smaller and more evenly distributed chunk.

If that doesn't work, try reducing your max_memory_in_mb parameter to 256.

kshell
  • 236
  • 1
  • 6
  • no but wait, I have a huge cluster and this dataset is relatively tiny. I do not think this can be due to a memory error. Can you please try on your side and tell me what error you get? Why would reduce `max_memory_in_mb` be useful here? Thanks – ℕʘʘḆḽḘ Jun 07 '18 at 22:51