2

I am running sparklyr with R on a local instance with 8 cores and 64Gb RAM. My job is to left_join a [50 000 000, 12] dataframe with a [20 000 000, 3] dataframe, which I run with Spark.

# Load packages
library(tidyverse)
library(sparklyr)


# Initialize configuration with defaults
config <- spark_config()

# Memory
# Set memory allocation for whole local Spark instance
# Sys.setenv("SPARK_MEM" = "50g")

# Set driver and executor memory allocations
# config$spark.driver.memory <- "8g"
# config$spark.driver.maxResultSize <- "8g"


# Connect to local cluster with custom configuration
sc <- spark_connect(master = "local", config = config, spark_home = spark_home_dir())


# Read df1 and df2
df1 <- spark_read_parquet(sc, 
                          path = "/mnt/df1/",
                          memory = FALSE, overwrite = TRUE)
df2 <- spark_read_parquet(sc, 
                          path = "/mnt/df2/",
                          memory = FALSE, overwrite = TRUE)


# Left join
df3 <- df1 %>%
  dplyr::left_join(df2)


# Write or collect
sparklyr::spark_write_parquet(df3, path="/mnt/") # or
df3 <- df3 %>% collect()

No matter how I configure the Spark configuration file, the code fails with a java.lang.OutOfMemoryError: Java heap space.

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 8.0 failed 1 times, most recent failure: Lost task 2.0 in stage 8.0 (TID 96, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space

I have tried so far different combinations of

Sys.setenv("SPARK_MEM" = "50g")
config["sparklyr.shell.driver-memory"] <- "20G"  
config["sparklyr.shell.num-executors"] <- 8  
config$spark.driver.maxResultSize <- "8g" 
config$spark.executor.memory <- "8g"  
config$spark.memory.fraction <- 0.9 

either in the R script or in the spark configuration file.

Similar questions have been asked 1 2 3 but none of these solved my problem.

Konrad Rudolph
  • 530,221
  • 131
  • 937
  • 1,214
obruzzi
  • 456
  • 1
  • 4
  • 12

1 Answers1

1

You must specify a join key to left_join().

Otherwise you are trying to compute the cartesian product, which has size [1 000 000 000 000 000, 15] and surely overflows memory.

Also: avoid calling collect() on big datasets, since this will move all data back to the driver and has high chances of OOM on the driver.

Pierre Gramme
  • 1,209
  • 7
  • 23
  • 1
    Thanks Pierre. Joining occurs with two variables which have the same names in both dataframes. I assume that R tries only to join variables with similar names. However, I tried again with indicating the columns to join with `by = c("var1", "var2")` without further success. I understand that the `collect()` function should be used with caution but the final dataframe has a `dim` of `[50 000 000, 13]` and should fit to memory. The error occurs regardless if I run `collect`or if I try to write on disk. Therefore, the problem has to be with the joining function. – obruzzi Sep 07 '21 at 18:05
  • You're right about the default join key: I forgot that difference between sparklyr and SparkR. But if the key distribution is skewed (i.e. a few keys are very frequent), the output of the join can still be really big. You can explore the largest keys after joining with this: `df_stats = df1 %>% count(var1, var2, name="n1") %>% left_join(df2 %>% count(var1, var2, name="n2")) %>% mutate(n3=n1*coalesce(n2,1)) %>% arrange(desc(n3))`. Do you have keys with very big `n3`? – Pierre Gramme Sep 10 '21 at 15:29
  • Thanks for the input! I tried that but the script failed due to memory issues ```java.lang.OutOfMemoryError: Java heap space``` Any other suggestions? – obruzzi Sep 22 '21 at 12:46
  • Weird. Do you also have the same error when writing the query directly in a scala spark-shell? Maybe you can try to split this counting query: `s1 = df1 %>% count(var1, var2, name="n1") %>% arrange(desc(n1)) %>% collect()` & same with df2, then join. You'll see which part is problematic. – Pierre Gramme Sep 28 '21 at 13:33
  • This works with `df2` where all `n1` values are 1. With `df1` this fails. Would you recommend runing a spark-shell in RStudio? If yes, would you have any instructions how to do that? – obruzzi Sep 29 '21 at 13:37
  • I don't have much experience with standalone clusters. It will probably be easier to start the cluster as you usually do (from RStudio or not, as you want), and then open the spark-shell in a shell independent from RStudio (see [Spark doc](https://spark.apache.org/docs/latest/spark-standalone.html#connecting-an-application-to-the-cluster)). To get the right working directory & Spark port, `sparklyr::spark_install_dir()` might help. – Pierre Gramme Sep 30 '21 at 14:38
  • You can also try to further simplify the sparklyr query on df1, e.g. count only var1 or only var2 or do not sort output or begin with filtering rows. Then check if any of these can run without OOM on sparlyr. This will give you a hint of where the problem is – Pierre Gramme Sep 30 '21 at 15:26
  • Weirdly all four options work. 1. ```s1 = df1 %>% count(var1, name="n1") %>% collect()``` works (table size ```[8000,2]```), 2. ```s1 = df1 %>% count(var2, name="n1") %>% collect()``` works (table size ```[404000,2]```), 3. ```s1 = df2 %>% count(var1, name="n1") %>% collect()``` works (table size ```[7000,2]```) and 4. ```s1 = df2 %>% count(var2, name="n1") %>% collect()``` works (table size ```[30000,2]```) – obruzzi Oct 05 '21 at 12:36
  • However, ```s1 = df1 %>% count(var1, var2, name="n1") %>% collect()``` fails. To clarify, the dataset does not have 8000x404000 unique keys. And I can join these tables when for looping the data in smaller batches but I would really like to employ spark for this as it is designed for data types such as mine. – obruzzi Oct 05 '21 at 12:46
  • Sorry for the delay. Weird results, again. Trying to spot where OOM occurs: `s1 = df1 %>% count(var1, var2, name="n1") %>% arrange(desc(n1))` and then test `s1 %>% head(100) %>% collect()` and `s1 %>% count()`. If these two work, it means that table s1 itself is maybe too big to be collected. How big is it? – Pierre Gramme Oct 27 '21 at 15:43
  • To better understand, you should also try the same experiment directly in Spark (or SparkR) with same config, but without using sparklyr – Pierre Gramme Oct 27 '21 at 15:45