2

Snippets of relevant code:

File 1: master.py

# Spark Imports
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext

#Import self defined function
from helper import enrichment


def ingestion(sc,ssc):
    # Work with stream
    kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "streaming-consumer", {topic: 1})
    # Call function defined in helper.py
    enriched_data = kafkaStream_json.map(lambda single_log:enrichment(single_log,client_id,machine_id))

if __name__ == "__main__":
    # Name of Spark App
    conf = SparkConf().setAppName("Test")

    # Spark and Spark streaming configuration
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 1)
    ingestion(sc,ssc)
    # Start the stream and keep it running unless terminated
    ssc.start()
    ssc.awaitTermination()

File2: helper.py

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
def enrichment():
    test_df = pd.DataFrame(some operations...)
    spark_df = sqlContext.createDataFrame(test_df)
    ...

Issues faced:

The streaming part of this works fine, however when I call the function enrichment these are the following problems that I face based on usage:

Case1: When the above example is run, it says:

spark_df = sqlContext.createDataFrame(test_df)
NameError: global name 'sqlContext' is not defined

Case 2: When I pass spark Context as an argument, this is the message that shows up:

"Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063."

This is the closest solution I have found: ERROR:SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063

However, it does not seem to resolve my issue. Any leads would be appreciated.

I need to have these as two separate files, Inline will not work. Code run using:

sudo $SPARK_HOME/spark-submit --master local[2] /home/user/master.py

steven
  • 644
  • 1
  • 11
  • 23

1 Answers1

1

I think you should to use SparkSession.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc').getOrCreate()

You can pass spark as argument to enrichment function:

def enrichment(spark):
    test_df = pd.DataFrame(some operations...)
    spark_df = spark.createDataFrame(test_df)
    ...

or:

def enrichment():
    spark = SparkSession.builder.getOrCreate()
    test_df = pd.DataFrame(some operations...)
    spark_df = spark.createDataFrame(test_df)
    ...
merenptah
  • 476
  • 4
  • 15
  • I used the the second example, since the first one does not work. But this runs only if I use it on Standalone mode, it does not run on YARN. Any idea why? – steven Sep 10 '18 at 12:10