rapache-sparkdplyrsparklyr

Running out of heap space in sparklyr, but have plenty of memory


I am getting heap space errors on even fairly small datasets. I can be sure that I'm not running out of system memory. For example, consider a dataset containing about 20M rows and 9 columns, and that takes up 1GB on disk. I am playing with it on a Google Compute node with 30gb of memory.

Let's say that I have this data in a dataframe called df. The following works fine, albeit somewhat slowly:

library(tidyverse) 
uniques <- search_raw_lt %>%
    group_by(my_key) %>%
    summarise() %>%
    ungroup()

The following throws java.lang.OutOfMemoryError: Java heap space.

library(tidyverse)
library(sparklyr)
sc <- spark_connect(master = "local")

df_tbl <- copy_to(sc, df)

unique_spark <- df_tbl %>%
  group_by(my_key) %>%
  summarise() %>%
  ungroup() %>%
  collect()

I tried this suggestion for increasing the heap space to Spark. The problem persists. Watching the machine's state on htop, I see that total memory usage never goes over about 10gb.

library(tidyverse)
library(sparklyr)

config <- spark_config()
config[["sparklyr.shell.conf"]] <- "spark.driver.extraJavaOptions=-XX:MaxHeapSize=24G"

sc <- spark_connect(master = "local")

df_tbl <- copy_to(sc, df)

unique_spark <- df_tbl %>%
  group_by(my_key) %>%
  summarise() %>%
  ungroup() %>%
  collect()

Finally, per Sandeep's comment, I tried lowering MaxHeapSize to 4G. (Is MaxHeapSize per virtual worker or for the entire Spark local instance?) I still got the heap space error, and again, I did not use much of the system's memory.


Solution

  • In looking into Sandeep's suggestions, I started digging into the sparklyr deployment notes. These mention that the driver might run out of memory at this stage, and to tweak some settings to correct it.

    These settings did not solve the problem, at least not initially. However, isolating the problem to the collect stage allowed me to find similar problems using SparkR on SO.

    These answers depended in part on setting the environment variable SPARK_MEM. Putting it all together, I got it to work as follows:

    library(tidyverse)
    library(sparklyr)
    
    # Set memory allocation for whole local Spark instance
    Sys.setenv("SPARK_MEM" = "13g")
    
    # Set driver and executor memory allocations
    config <- spark_config()
    config$spark.driver.memory <- "4G"
    config$spark.executor.memory <- "1G"
    
    # Connect to Spark instance
    sc <- spark_connect(master = "local")
    
    # Load data into Spark
    df_tbl <- copy_to(sc, df)
    
    # Summarise data
    uniques <- df_tbl %>%
      group_by(my_key) %>%
      summarise() %>%
      ungroup() %>%
      collect()