4

I am trying to use SparkContext.binaryFiles function to process a set of ZIP files. The setup is to map from a RDD of filenames, in which the mapping function uses the binaryFiles function.

The problem is that SparkContext is referenced in the mapping function, and I'm getting this error. How can I fix it?

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Sample code:

file_list_rdd.map(lambda x: sc.binaryFiles("/FileStore/tables/xyz/" + x[1]))

where file_list_rdd is a RDD of (id, filename) tuples.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
fatdragon
  • 2,211
  • 4
  • 26
  • 43
  • Actually, in the practical world. We are rarely ( almost don't) transfer SparkContext object from driver to workers. So why do you need to do that? – tauitdnmd Aug 27 '18 at 06:30
  • That's because the binaryFiles function is only available in the SparkContext class... See http://spark.apache.org/docs/latest/api/python/pyspark.html – fatdragon Aug 27 '18 at 06:40
  • Can you share some code please. This is a well-known issue to consider. – thebluephantom Aug 27 '18 at 06:53
  • 1
    Updated question with sample code. I think my issue is that I want to have two levels - for each row in the top-level RDD, I want to create a new RDD. But, the only way to create RDD's is to use SparkContext and yet I cannot use SparkContext there... – fatdragon Aug 28 '18 at 05:56
  • Any value, the answer that is? – thebluephantom Aug 30 '18 at 10:00

1 Answers1

2

It would appear that you need to call the function without referencing the spark context - and if that is actually applicable.

Also consider moving the function / def into the map body statement(s) itself. That is commonly done - and we are using a functional language. I have been at a loss to resolve Serialization errors unless I resort to the aforementioned and move defs to the Executor logic.

Some file processing is also done via the driver. This post could be of interest: How to paralelize spark etl more w/out losing info (in file names). Based on your code snippet I would be looking at this here.

And you should use something like this and process accordingly:

 zip_data = sc.binaryFiles('/user/path-to-folder-with-zips/*.zip')

Now you are using it from the Driver and the sc.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83