0

When I run this code on a wikipedia dump in json format, after some time, at stage 2, I get java.lang.OutOfMemoryError: Java heap space. the complete dataset consists of about 700 json files of 4.1MB each in this format: {"id":"1","revid":"1","url":"www.wikipedia.com","title":"title","text":"text 1"} {"id":"2","revid":"2","url":"www.wikipedia.com/hi","title":"title2","text":"text 2"}

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, collect_list, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark import SparkConf
import pyspark
import time
import os
import psutil

spark.stop()

# Configurazione Spark
conf = SparkConf()
conf.set("spark.driver.memory", "50g")
conf.set("spark.executor.memory", "50g")
#conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
#conf.set("spark.executor.extraJavaOptions", "-XX:MaxGCPauseMillis=100 -XX:G1HeapRegionSize=16m -XX:InitiatingHeapOccupancyPercent=35")
conf.set("spark.sql.shuffle.partitions", "1000")
conf.set("spark.memory.storageFraction", "0.8")


# Crea una sessione Spark
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Ottieni l'oggetto SparkContext
sc = spark.sparkContext

# Ottieni l'URL dell'interfaccia web di Spark
ui_url = sc.uiWebUrl

# Stampa l'URL dell'interfaccia web di Spark
print("URL dell'interfaccia web di Spark:", ui_url)

# Definisci lo schema del dataset JSON
json_schema = StructType([
    StructField("id", StringType(), True),
    StructField("revid", StringType(), True),
    StructField("url", StringType(), True),
    StructField("title", StringType(), True),
    StructField("text", StringType(), True)
])

# Carica tutti i file JSON nella cartella specificata
data = spark.read.schema(json_schema).json("/Volumes/SSD_NVME/Spark/spark-3.4.0-bin-hadoop3/datasets/AA/*.json") \
    .select("id", "title", "text")
#data = data.repartition(100)  # Imposta il numero di partizioni desiderato

# Estrai i token dai campi "title" e "text"
tokens = data.select("id", explode(split(regexp_replace(data.title, "[^a-zA-Z0-9]", " "), "\\s+")).alias("token")) \
    .union(data.select("id", explode(split(regexp_replace(data.text, "[^a-zA-Z0-9]", " "), "\\s+")).alias("token")))

# Calcola il conteggio dei token per ogni documento
inverted_index = tokens.rdd \
    .map(lambda row: ((row.id, row.token), 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda item: (item[0][1], [(item[0][0], item[1])])) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda item: (item[0], [x[0] for x in item[1]], sum(x[1] for x in item[1])))

# Crea lo schema del DataFrame
schema = StructType([
    StructField("token", StringType(), True),
    StructField("documents", ArrayType(StringType()), True),
    StructField("term_frequency", IntegerType(), True)
])

# Crea il DataFrame
inverted_index_df = spark.createDataFrame(inverted_index, schema)

# Salva l'indice invertito in un file di output
inverted_index_df.write.mode("overwrite").json("/Volumes/SSD_NVME/output_AA_4")

# Termina la sessione Spark
#spark.stop()

I've tried changing the memory in the code, but I get no results, and in the ui I always get the same amount of memory Spark UI with a smaller dataset it works without problems.

1 Answers1

0

Okay, so a few things I notice here that might help:

  • your storage fraction is set to 0.8; spark splits the memory of the executors into storage and compute; you are setting your storage to 80% of the total executor memory, meaning that even if you set 50g as your executor memory, you get only like 10g for computing. The storage part is used for caching, which you don't do for any of the dataframes, so I suggest using the default fraction, or even lowering the storage value.
  • On the image you provided, I see that you only have one executor(with memory of 400MB which does not fit the memory specified in the code, but I will assume that it is from some previous run). This does not make sense in a spark context; you don't get any parallelism, and the full load is handled by the one executor. It makes more sense to split the 12 cores on ie. 3 executors x 4 cores with 12g memory on each.
  • 50g on the driver is almost never needed; I would argue you dont need more than 1g in your case. The driver does not do the heavy lifting here; it is just scheduling the tasks on the executors.
  • A 1000 shuffle partitions are also an overkill; you have an input of 2.3GB; a ideal spark partition for processing is around 100-200MB, so in your case I think something around 20 partitions works; for shuffling you can use more if you have low executor memory, so it will create more tasks that use less memory, making the OMM a bit more unlikely, however 1000 is an overkill for only 2GB of data, you will have a lot of empty tasks/partitions.
  • And in the end after all this, the main reason you get the OMM is simply because your executor does not have enough memory to process the incoming data. The image says it has only 400MB of heap space. Also you can see that the garbage collector is struggling, because it has to work overtime to free up space for new tasks.

You can find a lot of cases of OMM googling, or searching on SO if this does not help. This related SO question has a lot of good pointers.

EDIT: I am not 100% sure on this, but a lot of SparkConf properties cannot be changed from 'inside' the code; try setting the memory and executor options before starting the app(ie. via spark-submit arguments). In the UI on the executors page you will see how much memory they actually use and have on disposal. So the image might be actually the real scenario, but it doesnt take the confs set inside the application into consideration.