I have a question regarding the usage of local variables in closures when accessing Spark RDDs. The problem I would like to solve looks as follows:
I have a list of textfiles that should be read into an RDD. However, first I need to add additional information to an RDD that is created from a single textfile. This additional information is extracted from the filename. Then, the RDDs are put into one big RDD using union().
from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)
list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
tmp_rdd = spark_context.textFile(filename)
# extract_file_info('file_from_Owner.txt') == 'Owner'
file_owner = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work:
# The result is that always Bert will be the owner, i.e., never Ernie.
The problem is that the map() function within the loop does not refer to the “correct” file_owner. Instead, it will refer to the latest value of file_owner. On my local machine, I managed to fix the problem by calling the cache() function for each single RDD:
# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
tmp_rdd.cache()
# ..
My Question: Is using cache() the correct solution for my problem? Are there any alternatives?
Many Thanks!