0

Elastic search Version : 6.8.0

30 Spark tasks running parallel and sending 128 Mb of 452 records

JavaEsSpark.saveToEs(elasticRecords, indexName);

I had data around 1.4TB, Till some point Executors were running for 2 hrs of time and successfully loaded 800+GB

After then I started seeing many 429 errors, I was expecting "es.batch.write.retry.wait" and "es.batch.write.retry.count" would help me in delaying the Spark tasks and it was just failing and Launching another task with in milliseconds

https://github.com/elastic/elasticsearch-hadoop/blob/99937f25ed868c91bf19e5cdab527a61ea5ac60d/mr/src/main/java/org/elasticsearch/hadoop/rest/bulk/BulkProcessor.java#L196 (Throws 429, and Spark task failed)

Why for 429 we are not having retry waiting, with which we can give some ample of time for Elastic search to recover from 429 State ?

Is there any possiblity to slow down spark tasks, by launching tasks with some delay ?

org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [PUT] on [xyz/healthCareProvider/_bulk] failed; server[xyz.es.amazonaws.com/:443] returned [429|Too Many Requests:] at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:448) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:405) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:387) at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:224) at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.tryFlush(BulkProcessor.java:196) at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:499) at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:113) at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192) at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:108) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:108) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.

es.batch.write.retry.count=1000
es.batch.size.bytes=100mb
es.batch.write.retry.wait=100s
es.batch.write.retry.policy=simple

Reference

https://discuss.elastic.co/t/spark-es-batch-write-retry-count-negative-value-is-ignored/25436/2

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/performance.html

Elasticsearch drops too many requests -- would a buffer improve things?

What is the ideal bulk size formula in ElasticSearch?

https://github.com/elastic/elasticsearch/issues/15465

writing to elastic search is slow from spark and scaling up nodes throws up errors

Elastic search could not write all entries: May be es was overloaded

https://github.com/wso2-attic/wso2-spark/blob/master/docs/configuration.md

Syed Rafi
  • 825
  • 2
  • 12
  • 35

0 Answers0