I use Pyspark with Spark 2.4 in the standalone mode on Linux for aggregating incoming data and writing these into a database using a Jupyter notebook (currently for testing) with the following stripped content:
spark = SparkSession.builder.appName("foo").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:1234").option("subscribe", "bar”).load()
df2 = df.withWatermark("timestamp", "1 second").groupby(F.window('timestamp', "5 second")).agg(F.min("timestamp").alias('timestamp_window_min'))
def write_into_sink(df, epoch_id):
df.write.jdbc(table="foo_agg", mode="append", [...])
pass
query_write_sink = df2.writeStream.foreachBatch(write_into_sink).trigger(processingTime = "1 seconds").start()
After running for 2 hours in the tmp directory of Spark there are dozens of directories in tmp/temporary-[...]/state/0/
with a lot of small cec and delta files that adds up to 6 GB of disk space during the run. So, my problem is that I cannot run the script for few hours since the disk would be full; how could I run it for a longer time like days or even months? If I close/kill the python kernel, the directories are purged.
I already followed Apache Spark does not delete temporary directories and adusted conf/spark-env.sh
to SPARK_WORKER_OPTS="spark.worker.cleanup.enabled=true"
but is still does not help after a restart since it only deals with the files after the spark run and not during. I also tried SPARK_WORKER_OPTS="spark.worker.cleanup.enabled=true spark.worker.cleanup.appDataTtl=120"
in the same file with the same non existing result.
So, do you have an idea how to get rid of the tmp files of spark during the run?