I just realized that I am calling the following code many times and that seems not right:
spark = SparkSession.builder.getOrCreate()
Some steps of my code run on worker context. So the spark session created while in the driver is not available to the worker.
I know that the getOrCreate() method checks if there is any global session available for use, so it may not always create a new one, but this forces me to ask for the spark session again and again.
I checked around and I saw people sending the session as argument of UDF or foreach functions, but couldn't find much about it.
So, what is the proper way to access spark while inside the worker?
EDIT: Added my use case below / Changed steps details
Maybe my use case gets clearer with the list below:
1. Get data from eventhub.
2. Save data to delta table
3. Query distinct IDs
4. Foreach ID
4.1. Query other database to get info about the body based on the ID
4.2. For each row using UDF function (CSV)
4.2.1. Transform csv into dataframe and return list of tuples
4.3. Merge all dataframes using flatMap on the rows
4.4. Write to somewhere
I am receiving messages from eventhub and each message has a CSV body and an ID.
Each message may be completely different from another and if so, at the end of all, I am going to save each one in a different DW table.
So in order to do that I chose the following strategy:
First, Save all the CSV body and IDs in a generic Delta table, just like they came (I am partitioning by ID)
Now I can query all the data related to each ID one by one and this makes possible to process all the data related to that ID in a single batch.
When I query all body data of a specific ID, I have X rows, and I need to iterate over them transforming the CSV body of each row to a Dataframe.
After that, I merge all the dataframes into one and save it to the right table in DW.
For each dinstinct ID, I use spark to get the info about the body, and each read of CSV or write to DW is already beeing performed from inside a worker.
EDIT: Added some code for the people
4 Foreach ID
# dfSdIds is a dataframe containing all distinct ids that I want to iterate over
dfSdIds.rdd.foreach(SaveAggregatedBodyRows)
4.2 For each row using UDF function (CSV)
# mapping: is a json structure that is going to generate the dataframe schema of the CSV inside the udf function
# argSchema: is the expected udf returning structure ArrayType(StructType(...))
def SaveAggregatedBodyRows(row):
...
spark = SparkSession.builder.getOrCreate()
dfCsvBody = spark.sql('select body from delta.`/dbfs/' + allInOneLocation + '` where SdIds = {}'.format(sdid))
UdfConvertCSVToDF = udf(lambda body, mapping: ConvertCSVToDF(body, mapping), argSchema)
dfConvertedBody = dfCsvBody.withColumn('body', UdfConvertCSVToDF(dfCsvBody.body, lit(mapping)))
4.2.1 Transform csv into dataframe and return list of tuples
def ConvertCSVToDF(body, mapping):
...
spark = SparkSession.builder.getOrCreate()
csvData = spark.sparkContext.parallelize(splittedBody)
df = (spark.read
.option("header", True)
.option("delimiter", delimiter)
.option("quote", quote)
.option("nullValue", nullValue)
.schema(schema)
.csv(csvData))
return list(map(tuple, df.select('*').collect()))
4.3 Merge all dataframes using flatMap on the rows
# mapSchema is the same as argSchema but without ArrayType
flatRdd = dfConvertedBody.rdd.flatMap(lambda x: x).flatMap(lambda x: x)
dfMerged = flatRdd.toDF(mapSchema)
4.4 Write to somewhere
(dfMerged.write
.format(savingFileFormat)
.mode("append")
.option("checkpointLocation", checkpointLocation)
.save(tableLocation))
I know there is a lot to improve in this code, but I am doing as I am learning pyspark.
This question has become way more than I expected, but the point of it is that I called
spark = SparkSession.builder.getOrCreate()
at the driver, inside the method SaveAggregatedBodyRows AND inside the method ConvertCSVToDF.
People said it wouldn't work, but it is.