1

I just realized that I am calling the following code many times and that seems not right:

spark = SparkSession.builder.getOrCreate()

Some steps of my code run on worker context. So the spark session created while in the driver is not available to the worker.

I know that the getOrCreate() method checks if there is any global session available for use, so it may not always create a new one, but this forces me to ask for the spark session again and again.

I checked around and I saw people sending the session as argument of UDF or foreach functions, but couldn't find much about it.

So, what is the proper way to access spark while inside the worker?

EDIT: Added my use case below / Changed steps details

Maybe my use case gets clearer with the list below:

 1. Get data from eventhub. 
 2. Save data to delta table
 3. Query distinct IDs
 4. Foreach ID
  4.1. Query other database to get info about the body based on the ID
  4.2. For each row using UDF function (CSV)
   4.2.1. Transform csv into dataframe and return list of tuples
  4.3. Merge all dataframes using flatMap on the rows
  4.4. Write to somewhere

I am receiving messages from eventhub and each message has a CSV body and an ID.

Each message may be completely different from another and if so, at the end of all, I am going to save each one in a different DW table.

So in order to do that I chose the following strategy:

First, Save all the CSV body and IDs in a generic Delta table, just like they came (I am partitioning by ID)

Now I can query all the data related to each ID one by one and this makes possible to process all the data related to that ID in a single batch.

When I query all body data of a specific ID, I have X rows, and I need to iterate over them transforming the CSV body of each row to a Dataframe.

After that, I merge all the dataframes into one and save it to the right table in DW.

For each dinstinct ID, I use spark to get the info about the body, and each read of CSV or write to DW is already beeing performed from inside a worker.

EDIT: Added some code for the people

4 Foreach ID

# dfSdIds is a dataframe containing all distinct ids that I want to iterate over
dfSdIds.rdd.foreach(SaveAggregatedBodyRows)

4.2 For each row using UDF function (CSV)

# mapping: is a json structure that is going to generate the dataframe schema of the CSV inside the udf function
# argSchema: is the expected udf returning structure ArrayType(StructType(...))
def SaveAggregatedBodyRows(row): 

...

spark = SparkSession.builder.getOrCreate()
dfCsvBody = spark.sql('select body from delta.`/dbfs/' + allInOneLocation + '` where SdIds = {}'.format(sdid))

UdfConvertCSVToDF = udf(lambda body, mapping: ConvertCSVToDF(body, mapping), argSchema)
dfConvertedBody = dfCsvBody.withColumn('body', UdfConvertCSVToDF(dfCsvBody.body, lit(mapping)))

4.2.1 Transform csv into dataframe and return list of tuples

def ConvertCSVToDF(body, mapping): 

...

spark = SparkSession.builder.getOrCreate()           
csvData = spark.sparkContext.parallelize(splittedBody)

df = (spark.read
.option("header", True)
.option("delimiter", delimiter)
.option("quote", quote)
.option("nullValue", nullValue)
.schema(schema)
.csv(csvData))

return list(map(tuple, df.select('*').collect()))

4.3 Merge all dataframes using flatMap on the rows

# mapSchema is the same as argSchema but without ArrayType
flatRdd = dfConvertedBody.rdd.flatMap(lambda x: x).flatMap(lambda x: x)      
dfMerged = flatRdd.toDF(mapSchema)

4.4 Write to somewhere

(dfMerged.write
   .format(savingFileFormat)
   .mode("append")
   .option("checkpointLocation", checkpointLocation)
   .save(tableLocation)) 

I know there is a lot to improve in this code, but I am doing as I am learning pyspark.

This question has become way more than I expected, but the point of it is that I called

spark = SparkSession.builder.getOrCreate() 

at the driver, inside the method SaveAggregatedBodyRows AND inside the method ConvertCSVToDF.

People said it wouldn't work, but it is.

Flavio Pegas
  • 388
  • 1
  • 9
  • 26
  • 1
    That's an interesting use case that I've never encountered before. Do you mind describing what you're trying to accomplish in more detail? – Charlie Flowers Jul 05 '19 at 22:45
  • Sure! I am gonna split it in a few comments, so I can explain it with a bit more detail. I am receiving messages from eventhub. – Flavio Pegas Jul 06 '19 at 00:38
  • Each message may be completely different from another and if so, each one is gonna be saved in a different DW table. However, all messages have a body and an Id. – Flavio Pegas Jul 06 '19 at 00:40
  • So the strategy is: Save them all in a generic Delta table, so then I can query it by Id and process the body in batches. Each body is a CSV with about 200 lines – Flavio Pegas Jul 06 '19 at 00:47
  • When I query from Delta all body data of a specific ID, I have x rows, and I need to iterate over them using spark to transform CSV to dataframe and then return a dataframe containing many dataframes so then I can merge them all and write it to DW in a single big batch – Flavio Pegas Jul 06 '19 at 00:51
  • Each read of csv and write to dw is going to be performed from inside a worker, since it is all being processed in a foreach (for each id in the Delta table) – Flavio Pegas Jul 06 '19 at 00:56
  • 1
    @FlavioDiasPs: It sounds like you want to use nested RDD/dataframe (looping over one and creating another in each iteration)? That is not allowed in Spark see e.g.: https://stackoverflow.com/questions/23793117/nullpointerexception-in-scala-spark-appears-to-be-caused-be-collection-type. – Shaido Jul 06 '19 at 03:25
  • @FlavioDiasPs you can not access SparkSession, SparkContext, Dataframes/Datasets, RDDs from executor code. These entities are available only from the driver code, please check the discussion [here](https://stackoverflow.com/questions/47358177/caused-by-java-lang-nullpointerexception-at-org-apache-spark-sql-dataset?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa) – abiratsis Jul 07 '19 at 13:15
  • Possible duplicate of [Caused by: java.lang.NullPointerException at org.apache.spark.sql.Dataset](https://stackoverflow.com/questions/47358177/caused-by-java-lang-nullpointerexception-at-org-apache-spark-sql-dataset) – abiratsis Jul 07 '19 at 13:16
  • Based on your use-case (which would look much nicer in question body than in comments), can this help? https://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job – mazaneicha Jul 07 '19 at 14:52
  • @AlexandrosBiratsis thanks for the answer but this is not what I am asking. My code is already working. I am not having problem with access between driver and worker. I added my use case to the question. You can see there, that after step 4, it is all from inside a worker. – Flavio Pegas Jul 08 '19 at 17:31
  • @mazaneicha Thanks for the reply. I though this question would have a very straight foward answer, but this got bigger than I expected. I edited my question as you said. Now, about the suggestion, I am already using partitionby at step 2, after that, partitioning isn't necessary because since I am already with all the data of a single ID. – Flavio Pegas Jul 08 '19 at 17:35
  • I must admit, you have sort of lost me. – thebluephantom Aug 01 '19 at 18:01
  • I must admit that I am surprised it works. I tried to turn a worker into a driver but was not very lucky. – jgp Mar 24 '20 at 12:04

0 Answers0