I have a pyspark program with multiple independent modules that can each independently process data to meet my various needs. But they can also be chained together to process data in a pipeline. Each of these modules builds a SparkSession and executes perfectly on their own.
However, when I try to run them serially within the same python process, I run into issues. At the moment when the second module in the pipeline executes, spark complains that the SparkContext I am attempting to use has been stopped:
py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
Each of these modules builds a SparkSession at the beginning of execution and stops the sparkContext at the end of its process. I build and stop sessions/contexts like so:
session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()
According to official documentation, getOrCreate
"gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder." But I don't want this behavior (this behavior where the process attempts to get an existing session). I can't find any way to disable it, and I can't figure out how to destroy the session -- I only know how to stop its associated SparkContext.
How can I build new SparkSessions in independent modules, and execute them in sequence in the same Python process without previous sessions interfering with the newly created ones?
The following is an example of the project structure:
main.py
import collect
import process
if __name__ == '__main__':
data = collect.execute()
process.execute(data)
collect.py
import datagetter
def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()
data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result
process.py
import datagetter
def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()
data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result