1

I am trying to read data from a kafka topic using foreachBatch() like below.

def write_stream_batches(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str, kafkaconfig: dict):
    query = kafka_df.writeStream \
        .format('kafka') \
        .foreachBatch(join_kafka_streams) \
        .option('checkpointLocation', checkpoint_location) \
        .start()
    query.awaitTermination()

def join_kafka_streams(kafka_df: DataFrame, batch_id: int):
    main_df = spark.sql('select * from table where some_filter_including_partitions')
    join_df = kafka_df.join(main_df, ['key_col1', 'key_col2', 'key_col3', 'key_col4'], 'inner')
    join_df.write.format('kafka') \
        .option('kafka.bootstrap.servers', kafkaconfig['kafka_broker']) \
        .option('kafka.batch.size', kafkaconfig['kafka_batch_size']) \
        .option('retries', kafkaconfig['retries']) \
        .option('kafka.max.request.size', kafkaconfig['kafka_max_request_size']) \
        .option('kafka.max.block.ms', kafkaconfig['kafka_max_block_ms']) \
        .option('kafka.metadata.max.age.ms', kafkaconfig['kafka_metadata_max_age_ms']) \
        .option('kafka.request.timeout.ms', kafkaconfig['kafka_request_timeout_ms']) \
        .option('kafka.linger.ms', kafkaconfig['kafka_linger_ms']) \
        .option('kafka.delivery.timeout.ms', kafkaconfig['kafka_delivery_timeout_ms']) \
        .option('acks', kafkaconfig['acks']) \
        .option('kafka.compression.type', kafkaconfig['kafka_compression_type']) \
        .option('kafka.security.protocol', kafkaconfig['kafka_security_protocol']) \
        .option('kafka.sasl.jaas.config', oauth_config) \
        .option('kafka.sasl.login.callback.handler.class', kafkaconfig['kafka_sasl_login_callback_handler_class']) \
        .option('kafka.sasl.mechanism', kafkaconfig['kafka_sasl_mechanism']) \
        .option('topic', topic_name) \
        .save()

The data in kafka_df is around 2.5million and the data from main_df is 4million When I start the job, the join resultaunt contains 900k records and after loading 100k records, the job fails with below exception after running for 25mins.

py4j.protocol.Py4JJavaError: An error occurred while calling o500.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 66, 100.67.55.233, executor 0): kafkashaded.org.apache.kafka.common.errors.TimeoutException: Expiring 13 record(s) for x1-dev-asw32-edr-02a1-ba87-332c7da70fc1-topic_name:130000 ms has passed since batch creation
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:999)
    at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:70)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:180)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:91)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:158)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:157)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:999)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:437)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:421)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

I am submitting the job on my databricks cluster. Is the exception above due a session timeout or is it because of memory issues ? Could anyone let me know what is causing the exception ? Any help is much appreciated.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Metadata
  • 2,127
  • 9
  • 56
  • 127
  • The exception is due to a producer batch timeout. You can set `kafka.batch.size=0` to disable batching – OneCricketeer Jan 25 '22 at 18:48
  • ok, if I set kafka.batch.size=0, does it mean every time I push data into the kafka topic, all of the data is considered as a single batch or does Kafka still split data into multiple smaller batches to process it ? – Metadata Jan 26 '22 at 17:05
  • From docs - _a batch size of zero will disable batching entirely_. There will still be multiple requests per partition, though. – OneCricketeer Jan 26 '22 at 17:35
  • What values of `kafkaconfig['kafka_request_timeout_ms']` and `kafkaconfig['kafka_batch_size']` do you use? – oskarryn Jan 04 '23 at 21:33

1 Answers1

0

In my case these Kafka timeout exceptions went away after repartitioning the dataframe to more partitions before writing. The dataframe had low number of partitions because of the low number of partitions in the input Kafka topic (the default behavior in Spark), so this lead to low number of tasks when writing to another Kafka topic. When I repartitioned the dataframe to ~50 partitions, seems that Kafka producer is using more threads and manages to write queued data without expiring records.

You can also increase the number of partitions in the Kafka topic you are writing to.

Other tweaks include increasing the kafka.request.timeout.ms configuration from the default 30 seconds to e.g. 5 minutes and increasing the kafka.batch.size from the default 16 KB to e.g. 512 KB. When larger batches are used, there is less requests and the throughput increases, which helps to send records before they expire. For the record, this goes against the common advice to decrease the batch size from the default (see e.g. this answer), but small Kafka batches didn't help in my case and I don't see how could they (smaller batches mean more requests and lower throughput, so maybe individual batches are send quicker, but overall there would be more batches so requests would still expire, just in smaller buffers).

oskarryn
  • 170
  • 2
  • 13