I am trying to implement a Spark streaming application, but I am getting back an exception: "py4j.Py4JException: Method getnewargs([]) does not exist"
I do not understand the source of this exception. I read here that I cannot use a SparkSession instance outside of the driver. But, I do not know whether I am doing that. I don't understand how to tell whether some code executes on the driver or an executor - I understand the difference between transformations and actions (I think), but when it comes to streams and foreachRDD, I get lost.
The app is a Spark streaming app, running on AWS EMR, reading data from AWS Kinesis. We submit the Spark app via spark-submit, with --deploy-mode cluster. Each record in the stream is a JSON object in the form:
{"type":"some string","state":"an escaped JSON string"}
E.g.:
{"type":"type1","state":"{\"some_property\":\"some value\"}"}
Here is my app in its current state:
# Each handler subclasses from BaseHandler and
# has the method
# def process(self, df, df_writer, base_writer_path)
# Each handler's process method performs additional transformations.
# df_writer is a function which writes a Dataframe to some S3 location.
HANDLER_MAP = {
'type1': Type1Handler(),
'type2': Type2Handler(),
'type3': Type3Handler()
}
FORMAT = 'MyProject %(asctime)s %(levelname)s %(name)s: %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)
# Use a closure and lambda to create streaming context
create = lambda: create_streaming_context(
spark_app_name=spark_app_name,
kinesis_stream_name=kinesis_stream_name,
kinesis_endpoint=kinesis_endpoint,
kinesis_region=kinesis_region,
initial_position=InitialPositionInStream.LATEST,
checkpoint_interval=checkpoint_interval,
checkpoint_s3_path=checkpoint_s3_path,
data_s3_path=data_s3_path)
streaming_context = StreamingContext.getOrCreate(checkpoint_s3_path, create)
streaming_context.start()
streaming_context.awaitTermination()
The function for creating the streaming context:
def create_streaming_context(
spark_app_name, kinesis_stream_name, kinesis_endpoint,
kinesis_region, initial_position, checkpoint_interval,
data_s3_path, checkpoint_s3_path):
"""Create a new streaming context or reuse a checkpointed one."""
# Spark configuration
spark_conf = SparkConf()
spark_conf.set('spark.streaming.blockInterval', 37500)
spark_conf.setAppName(spark_app_name)
# Spark context
spark_context = SparkContext(conf=spark_conf)
# Spark streaming context
streaming_context = StreamingContext(spark_context, batchDuration=300)
streaming_context.checkpoint(checkpoint_s3_path)
# Spark session
spark_session = get_spark_session_instance(spark_conf)
# Set up stream processing
stream = KinesisUtils.createStream(
streaming_context, spark_app_name, kinesis_stream_name,
kinesis_endpoint, kinesis_region, initial_position,
checkpoint_interval)
# Each record in the stream is a JSON object in the form:
# {"type": "some string", "state": "an escaped JSON string"}
json_stream = stream.map(json.loads)
for state_type in HANDLER_MAP.iterkeys():
filter_stream(json_stream, spark_session, state_type, data_s3_path)
return streaming_context
The function get_spark_session_instance returns a global SparkSession instance (copied from here):
def get_spark_session_instance(spark_conf):
"""Lazily instantiated global instance of SparkSession"""
logger.info('Obtaining global SparkSession instance...')
if 'sparkSessionSingletonInstance' not in globals():
logger.info('Global SparkSession instance does not exist, creating it...')
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=spark_conf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
The filter_stream function is intended to filter the stream by the type property in the JSON. The intention is to transform the stream into a stream where each record is the escaped JSON string from the "state" property in the original JSON:
def filter_stream(json_stream, spark_session, state_type, data_s3_path):
"""Filter stream by type and process the stream."""
state_type_stream = json_stream\
.filter(lambda jsonObj: jsonObj['type'] == state_type)\
.map(lambda jsonObj: jsonObj['state'])
state_type_stream.foreachRDD(lambda rdd: process_rdd(spark_session, rdd, state_type, df_writer, data_s3_path))
The process_rdd function is intended to load the JSON into a Dataframe, using the correct schema depending on the type in the original JSON object. The handler instance returns a valid Spark schema, and has a process method which performs further transformations on the dataframe (after which df_writer is called, and the Dataframe is written to S3):
def process_rdd(spark_session, rdd, state_type, df_writer, data_s3_path):
"""Process an RDD by state type."""
if rdd.isEmpty():
logger.info('RDD is empty, returning early.')
return
handler = HANDLER_MAP[state_type]
df = spark_session.read.json(rdd, handler.get_schema())
handler.process(df, df_writer, data_s3_path)
Basically I am confused about the source of the exception. Is it related to how I am using spark_session.read.json? If so, how is it related? If not, is there something else in my code which is incorrect?
Everything seems to work correctly if I just replace the call to StreamingContext.getOrCreate with the contents of the create_streaming_context method. I was mistaken about this - I get the same exception either way. I think the checkpoint stuff is a red herring... I am obviously doing something else incorrectly.
I would greatly appreciate any help with this problem and I'm happy to clarify anything or add additional information!