I have a spark application which consume from Kinesis with 6 shards. Data was produced to Kinesis at at most 2000 records/second. At non peak time data only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is enough to handle that.
I am using EMR 5.23.0, Spark 2.4.0, spark-streaming-kinesis-asl 2.4.0 I have 6 r5.4xLarge in my cluster, plenty of memory
Recently I am trying to checkpoint the application to S3. I am testing this at nonpeak time so the data incoming rate is very low like 200 records/sec. I run the Spark application by creating new context, checkpoint is created at s3, but when I kill the app and restarts, it failed to recover from checkpoint, and the error message is the following and my SparkUI shows all the batches are stucked:
19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): org.apache.spark.SparkException: Gave up after 3 retries while getting shard iterator from sequence number 49601654074184110438492229476281538439036626028298502210, last exception:
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000004 in stream my-stream-name under account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: e368b876-c315-d0f0-b513-e2af2bd14525)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782)
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749)
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738)
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383)
at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
... 20 more
My batch interval is 2.5 minutes, and I create the stream the regular way:
val kinesisStreams = (0 until 6).map { i =>
KinesisUtils.createStream(ssc, "MyKinesisStreamName", settings.kinesis_stream, settings.kinesis_endpoint, settings.kinesis_region, settings.kinesis_initial_position, Milliseconds(settings.spark_batchinterval_seconds*1000), StorageLevel.MEMORY_AND_DISK)
}
val unionStreams = ssc.union(kinesisStreams)
.map(linebytes => new String(linebytes, "UTF-8").replace("\"", ""))
Someone reported the same problem and does not seem to have an answer:
http://mail-archives.apache.org/mod_mbox/spark-issues/201807.mbox/%3CJIRA.13175528.1532948992000.116869.1532949000171@Atlassian.JIRA%3E Checkpointing records with Amazon KCL throws ProvisionedThroughputExceededException
Since Spark consuming from Kinesis with checkpoint are such a common thing in many Spark application, I am wondering whether I did anything wrong? Is there any other person experiencing the same and how to resolve this?
I am wondering if this is because my batch interval 2.5 minutes (150 seconds) is too long? because 150 seconds*200 record/second = 30000 records per batch and when checkpoint recovery is trying to load 30000 records from kinesis it will throw the error? Should I increase my shard count to 30 from 6?
Please help, I have to find an answer so this is frustrating.
Appreciate your help.