3

I am presently working with ASN 1 Decoder.I will be getting a Hex decimal code from producer and i will be collecting it in consumer. Then after i will be converting the hex code to RDD and then pass the hex value RDD to another function with in same class Decode_Module and will be using python asn1 decoder to decode the hex data and return it back and print it. I don't understand whats wrong with my code.I have already installed my asn1 parser dependencies in worker nodes too. Any wrong with the way i call in lambda expression or something else.

My ERROR: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063

PLEASE HELP ME THANK YOU

My CODE:

class telco_cn:

 def __init__(self,sc):
    self.sc = sc
    print ('in init function')
    logging.info('eneterd into init function')

 def decode_module(self,msg):
        try:
            logging.info('Entered into generate module')
            ### Providing input for module we need to load
            load_module(config_values['load_module'])
            ### Providing Value for Type of Decoding
            ASN1.ASN1Obj.CODEC = config_values['PER_DECODER']
            ### Providing Input for Align/UnAlign
            PER.VARIANT = config_values['PER_ALIGNED']
            ### Providing Input for pdu load
            pdu = GLOBAL.TYPE[config_values['pdu_load']]
            ### Providing Hex value to buf
            buf = '{}'.format(msg).decode('hex')
            return val
        except Exception as e:
            logging.debug('error in decode_module function %s' %str(e))


 def consumer_input(self,sc,k_topic):
            logging.info('entered into consumer input');print(k_topic)
            consumer = KafkaConsumer(ip and other values given)
            consumer.subscribe(k_topic)
            for msg in consumer:
                print(msg.value);
                a = sc.parallelize([msg.value])
                d = a.map(lambda x: self.decode_module(x)).collect()
                print d

if __name__ == "__main__":
    logging.info('Entered into main')
    conf = SparkConf()
    conf.setAppName('telco_consumer')
    conf.setMaster('yarn-client')
    sc = SparkContext(conf=conf)
    sqlContext = HiveContext(sc)
    cn = telco_cn(sc)
    cn.consumer_input(sc,config_values['kafka_topic'])
Rahul
  • 243
  • 2
  • 6
  • 17

3 Answers3

5

This is because self.decode_module contain instance of SparkContext.

To fix your code you can use @staticmethod:

class telco_cn:
    def __init__(self, sc):
        self.sc = sc

    @staticmethod
    def decode_module(msg):
        return msg

    def consumer_input(self, sc, k_topic):
        a = sc.parallelize(list('abcd'))
        d = a.map(lambda x: telco_cn.decode_module(x)).collect()
        print d


if __name__ == "__main__":
    conf = SparkConf()
    sc = SparkContext(conf=conf)
    cn = telco_cn(sc)
    cn.consumer_input(sc, '')

For more infomation:

http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark

Zhang Tong
  • 4,569
  • 3
  • 19
  • 38
  • Thanks for your answer.can you explain me what happens if we use static method over there and if we don't use static method over there. – Rahul Jun 02 '17 at 20:46
  • When i try the above code its returning me this error TypeError: 'JavaPackage' object is not callable – Rahul Jun 02 '17 at 21:06
0

You cannot reference the instance method (self.decode_module) inside the lambda expression, because it the instance object contains a SparkContext reference.

This occurs because internally PySpark tries to Pickle everything it gets to send to its workers. So when you say it should execute self.decode_module() inside the nodes, PySpark tries to pickle the whole (self) object (that contains a reference to the spark context).

To fix that, you just need to remove the SparkContext reference from the telco_cn class and use a different approach like using the SparkContext before calling the class instance (like Zhangs's answer suggests).

-1

With me the issue was:

text_df = "some text"
convertUDF = udf(lambda z: my_fynction(z), StringType())
cleaned_fun = text_df.withColumn('cleaned', udf(convertUDF, StringType())('text'))

I was giving udf() twice. Just did this:

convertUDF = lambda z: my_fynction(z)
cleaned_fun = text_df.withColumn('cleaned', udf(convertUDF, StringType())('text'))

and solved the error

yogender
  • 496
  • 4
  • 7