4

I am trying to implement a Spark streaming application, but I am getting back an exception: "py4j.Py4JException: Method getnewargs([]) does not exist"

I do not understand the source of this exception. I read here that I cannot use a SparkSession instance outside of the driver. But, I do not know whether I am doing that. I don't understand how to tell whether some code executes on the driver or an executor - I understand the difference between transformations and actions (I think), but when it comes to streams and foreachRDD, I get lost.

The app is a Spark streaming app, running on AWS EMR, reading data from AWS Kinesis. We submit the Spark app via spark-submit, with --deploy-mode cluster. Each record in the stream is a JSON object in the form:

{"type":"some string","state":"an escaped JSON string"}

E.g.:

{"type":"type1","state":"{\"some_property\":\"some value\"}"}

Here is my app in its current state:

# Each handler subclasses from BaseHandler and
# has the method
# def process(self, df, df_writer, base_writer_path)
# Each handler's process method performs additional transformations.
# df_writer is a function which writes a Dataframe to some S3 location.

HANDLER_MAP = {
    'type1': Type1Handler(),
    'type2': Type2Handler(),
    'type3': Type3Handler()
}

FORMAT = 'MyProject %(asctime)s %(levelname)s %(name)s: %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

# Use a closure and lambda to create streaming context
create = lambda: create_streaming_context(
    spark_app_name=spark_app_name,
    kinesis_stream_name=kinesis_stream_name,
    kinesis_endpoint=kinesis_endpoint,
    kinesis_region=kinesis_region,
    initial_position=InitialPositionInStream.LATEST,
    checkpoint_interval=checkpoint_interval,
    checkpoint_s3_path=checkpoint_s3_path,
    data_s3_path=data_s3_path)

streaming_context = StreamingContext.getOrCreate(checkpoint_s3_path, create)

streaming_context.start()
streaming_context.awaitTermination()

The function for creating the streaming context:

def create_streaming_context(
    spark_app_name, kinesis_stream_name, kinesis_endpoint,
    kinesis_region, initial_position, checkpoint_interval,
    data_s3_path, checkpoint_s3_path):
    """Create a new streaming context or reuse a checkpointed one."""

    # Spark configuration
    spark_conf = SparkConf()
    spark_conf.set('spark.streaming.blockInterval', 37500)
    spark_conf.setAppName(spark_app_name)

    # Spark context
    spark_context = SparkContext(conf=spark_conf)

    # Spark streaming context
    streaming_context = StreamingContext(spark_context, batchDuration=300)
    streaming_context.checkpoint(checkpoint_s3_path)

    # Spark session
    spark_session = get_spark_session_instance(spark_conf)

    # Set up stream processing
    stream = KinesisUtils.createStream(
        streaming_context, spark_app_name, kinesis_stream_name,
        kinesis_endpoint, kinesis_region, initial_position,
        checkpoint_interval)

    # Each record in the stream is a JSON object in the form:
    # {"type": "some string", "state": "an escaped JSON string"}
    json_stream = stream.map(json.loads)

    for state_type in HANDLER_MAP.iterkeys():
        filter_stream(json_stream, spark_session, state_type, data_s3_path)

    return streaming_context

The function get_spark_session_instance returns a global SparkSession instance (copied from here):

def get_spark_session_instance(spark_conf):
    """Lazily instantiated global instance of SparkSession"""

    logger.info('Obtaining global SparkSession instance...')
    if 'sparkSessionSingletonInstance' not in globals():
        logger.info('Global SparkSession instance does not exist, creating it...')

        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=spark_conf)\
            .getOrCreate()

    return globals()['sparkSessionSingletonInstance']

The filter_stream function is intended to filter the stream by the type property in the JSON. The intention is to transform the stream into a stream where each record is the escaped JSON string from the "state" property in the original JSON:

def filter_stream(json_stream, spark_session, state_type, data_s3_path):
    """Filter stream by type and process the stream."""

    state_type_stream = json_stream\
        .filter(lambda jsonObj: jsonObj['type'] == state_type)\
        .map(lambda jsonObj: jsonObj['state'])

    state_type_stream.foreachRDD(lambda rdd: process_rdd(spark_session, rdd, state_type, df_writer, data_s3_path))

The process_rdd function is intended to load the JSON into a Dataframe, using the correct schema depending on the type in the original JSON object. The handler instance returns a valid Spark schema, and has a process method which performs further transformations on the dataframe (after which df_writer is called, and the Dataframe is written to S3):

def process_rdd(spark_session, rdd, state_type, df_writer, data_s3_path):
    """Process an RDD by state type."""

    if rdd.isEmpty():
        logger.info('RDD is empty, returning early.')
        return

    handler = HANDLER_MAP[state_type]
    df = spark_session.read.json(rdd, handler.get_schema())
    handler.process(df, df_writer, data_s3_path)

Basically I am confused about the source of the exception. Is it related to how I am using spark_session.read.json? If so, how is it related? If not, is there something else in my code which is incorrect?

Everything seems to work correctly if I just replace the call to StreamingContext.getOrCreate with the contents of the create_streaming_context method. I was mistaken about this - I get the same exception either way. I think the checkpoint stuff is a red herring... I am obviously doing something else incorrectly.

I would greatly appreciate any help with this problem and I'm happy to clarify anything or add additional information!

Community
  • 1
  • 1
Justin Lang
  • 591
  • 1
  • 3
  • 17

0 Answers0