When I run this code on a wikipedia dump in json format, after some time, at stage 2, I get java.lang.OutOfMemoryError: Java heap space. the complete dataset consists of about 700 json files of 4.1MB each in this format: {"id":"1","revid":"1","url":"www.wikipedia.com","title":"title","text":"text 1"} {"id":"2","revid":"2","url":"www.wikipedia.com/hi","title":"title2","text":"text 2"}
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, collect_list, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark import SparkConf
import pyspark
import time
import os
import psutil
spark.stop()
# Configurazione Spark
conf = SparkConf()
conf.set("spark.driver.memory", "50g")
conf.set("spark.executor.memory", "50g")
#conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
#conf.set("spark.executor.extraJavaOptions", "-XX:MaxGCPauseMillis=100 -XX:G1HeapRegionSize=16m -XX:InitiatingHeapOccupancyPercent=35")
conf.set("spark.sql.shuffle.partitions", "1000")
conf.set("spark.memory.storageFraction", "0.8")
# Crea una sessione Spark
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Ottieni l'oggetto SparkContext
sc = spark.sparkContext
# Ottieni l'URL dell'interfaccia web di Spark
ui_url = sc.uiWebUrl
# Stampa l'URL dell'interfaccia web di Spark
print("URL dell'interfaccia web di Spark:", ui_url)
# Definisci lo schema del dataset JSON
json_schema = StructType([
StructField("id", StringType(), True),
StructField("revid", StringType(), True),
StructField("url", StringType(), True),
StructField("title", StringType(), True),
StructField("text", StringType(), True)
])
# Carica tutti i file JSON nella cartella specificata
data = spark.read.schema(json_schema).json("/Volumes/SSD_NVME/Spark/spark-3.4.0-bin-hadoop3/datasets/AA/*.json") \
.select("id", "title", "text")
#data = data.repartition(100) # Imposta il numero di partizioni desiderato
# Estrai i token dai campi "title" e "text"
tokens = data.select("id", explode(split(regexp_replace(data.title, "[^a-zA-Z0-9]", " "), "\\s+")).alias("token")) \
.union(data.select("id", explode(split(regexp_replace(data.text, "[^a-zA-Z0-9]", " "), "\\s+")).alias("token")))
# Calcola il conteggio dei token per ogni documento
inverted_index = tokens.rdd \
.map(lambda row: ((row.id, row.token), 1)) \
.reduceByKey(lambda a, b: a + b) \
.map(lambda item: (item[0][1], [(item[0][0], item[1])])) \
.reduceByKey(lambda a, b: a + b) \
.map(lambda item: (item[0], [x[0] for x in item[1]], sum(x[1] for x in item[1])))
# Crea lo schema del DataFrame
schema = StructType([
StructField("token", StringType(), True),
StructField("documents", ArrayType(StringType()), True),
StructField("term_frequency", IntegerType(), True)
])
# Crea il DataFrame
inverted_index_df = spark.createDataFrame(inverted_index, schema)
# Salva l'indice invertito in un file di output
inverted_index_df.write.mode("overwrite").json("/Volumes/SSD_NVME/output_AA_4")
# Termina la sessione Spark
#spark.stop()
I've tried changing the memory in the code, but I get no results, and in the ui I always get the same amount of memory Spark UI with a smaller dataset it works without problems.