-1

i have GBs of data in s3 and trying to bring parallelism while reading in my code by refering the following Link .

I am using the below code as a sample but when i run,it runs down to the following error:

Anyhelp on this is deeply appreciated as i am very new to spark.

EDIT : I have to read my s3 files using parallelism which is not explained in any post. People marking duplicate please read the problem first.

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

class telco_cn:
    def __init__(self, sc):
        self.sc = sc

    def decode_module(msg):
        df=spark.read.json(msg)
        return df

    def consumer_input(self, sc, k_topic):
        a = sc.parallelize(['s3://bucket1/1575158401-51e09537-0ce5-c775-6beb-fd1b0a568e15.json'])
        d = a.map(lambda x: telco_cn.decode_module(x)).collect()
        print (d)
if __name__ == "__main__":
    cn = telco_cn(sc)
    cn.consumer_input(sc, '')
user7422128
  • 902
  • 4
  • 17
  • 41
  • Does this answer your question? [Spark: Broadcast variables: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion](https://stackoverflow.com/questions/31508689/spark-broadcast-variables-it-appears-that-you-are-attempting-to-reference-spar) – blackbishop Jan 02 '20 at 19:01
  • I have already seen the above example. here they are just trying to append some values to the dictionary and my use case is very different. I am trying to read a json in df. @blackbishop – user7422128 Jan 03 '20 at 04:22

1 Answers1

0

You are attempting to call spark.read.json from within a map operation on an RDD. As this map operation will be executed on Spark's executor/worker nodes, you cannot reference a SparkContext/SparkSession variable (which is defined on the Spark driver) within the map. This is what the error message is trying to tell you.

Why not just call df=spark.read.json('s3://bucket1/1575158401-51e09537-0ce5-c775-6beb-fd1b0a568e15.json') directly?

Charlie Flowers
  • 1,287
  • 7
  • 12