1

I have a spark application which consume from Kinesis with 6 shards. Data was produced to Kinesis at at most 2000 records/second. At non peak time data only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is enough to handle that.

I am using EMR 5.23.0, Spark 2.4.0, spark-streaming-kinesis-asl 2.4.0 I have 6 r5.4xLarge in my cluster, plenty of memory

Recently I am trying to checkpoint the application to S3. I am testing this at nonpeak time so the data incoming rate is very low like 200 records/sec. I run the Spark application by creating new context, checkpoint is created at s3, but when I kill the app and restarts, it failed to recover from checkpoint, and the error message is the following and my SparkUI shows all the batches are stucked:

19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): org.apache.spark.SparkException: Gave up after 3 retries while getting shard iterator from sequence number 49601654074184110438492229476281538439036626028298502210, last exception:
        at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
        at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
        at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
        at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
        at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000004 in stream my-stream-name under account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: e368b876-c315-d0f0-b513-e2af2bd14525)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
        at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782)
        at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749)
        at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738)
        at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383)
        at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355)
        at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
        at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
        at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
        ... 20 more

My batch interval is 2.5 minutes, and I create the stream the regular way:

val kinesisStreams = (0 until 6).map { i =>
          KinesisUtils.createStream(ssc, "MyKinesisStreamName", settings.kinesis_stream, settings.kinesis_endpoint, settings.kinesis_region, settings.kinesis_initial_position, Milliseconds(settings.spark_batchinterval_seconds*1000), StorageLevel.MEMORY_AND_DISK)
        }

        val unionStreams = ssc.union(kinesisStreams)
          .map(linebytes => new String(linebytes, "UTF-8").replace("\"", ""))

Someone reported the same problem and does not seem to have an answer:

http://mail-archives.apache.org/mod_mbox/spark-issues/201807.mbox/%3CJIRA.13175528.1532948992000.116869.1532949000171@Atlassian.JIRA%3E Checkpointing records with Amazon KCL throws ProvisionedThroughputExceededException

Since Spark consuming from Kinesis with checkpoint are such a common thing in many Spark application, I am wondering whether I did anything wrong? Is there any other person experiencing the same and how to resolve this?

I am wondering if this is because my batch interval 2.5 minutes (150 seconds) is too long? because 150 seconds*200 record/second = 30000 records per batch and when checkpoint recovery is trying to load 30000 records from kinesis it will throw the error? Should I increase my shard count to 30 from 6?

Please help, I have to find an answer so this is frustrating.

Appreciate your help.

kinesis monitoring charts while the checkpoint recovery error is happening, you see read throughput exceeded

sparkui showing jobs stuck while the checkpoint recovery error is happening

Shu
  • 75
  • 5
  • Have you checked the kinesis stream usage, at the time of recovery from the checkpoint ? – Neha Jirafe Dec 25 '19 at 14:44
  • Yes I have. At the instant of recovery, there was a spike at the chart "Read Throughput Exceeded (Count / Request) — Average ReadProvisionedThroughputExceeded", no Get Record Count was seen – Shu Dec 25 '19 at 16:50
  • I observed that the recovery eventually will complete, it takes long time though, since I keep one hour window of data in memory, the recovery time can be 15 minutes or much higher. In peak time, it will crash the application eventually. The ProvisionedThroughputExceededException occurs very frequently still even if I tried increase blockinterval to 1000 ms so there is less partitions per batch. I also tried enable WAL, but still abundant kinesis error. Notice this link:(https://stackoverflow.com/questions/38390567/spark-streaming-checkpoint-recovery-is-very-very-slow) what to do? – Shu Dec 30 '19 at 18:39

0 Answers0