I have my input file size which is around 300 million records to store to Amazon Elastic search, I am using Amazon EMR ( Spark ) to load to Amazon Elastic search.
My EMR configuration is master - R5.4xlarge ( 16 vCore, 128 GiB memory ) and Slaves R5.8xlarge ( 10 nodes , 32 vCore each, 256 GiB memory on each node) .
MY ES configuration is Instance type (data)i3.large.elasticsearch Number of nodes 5 , Number of master instances- 3
the input that I am reading is from s3 , this is the code I have
SparkConf sparkConf = (new SparkConf())
.setAppName("SparkLoadToES");
sparkConf.set("spark.es.nodes", );
sparkConf.set("spark.es.port", );
sparkConf.set("spark.es.nodes.wan.only","true");
sparkConf.set("spark.es.batch.write.refresh","false");
sparkConf.set("spark.es.index.auto.create","true");
sparkConf.set("spark.es.resource","test-index");
sparkConf.set("spark.es.batch.size.entries","25000");
sparkConf.set("spark.es.batch.write.retry.wait","30s");
sparkConf.set("spark.es.batch.write.retry.count","-1");
sparkConf.set("spark.es.batch.size.bytes","15mb");
SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
Dataset<Row> datasetToLoad = sparkSession.read().parquet("s3a://databucket/temp/").repartition(320);
datasetToLoad.cache();
datasetToLoad.write().format("org.elasticsearch.spark.sql").mode(SaveMode.Append).save("test-index");
The data load is very slow when writing to Elastic search , it is taking 2 hrs to load 200 million , I wanted to get this down to around 30 to 40 mins to load 300 million,
number of shards that I have on my Amazon Elasticsearch index is 9 and 0 replica's.
I wanted to get the job done writing to Elasticsearch in 30 mins , but it is very slow , it clearly looks like there is some bottle neck at the Elasticsearch side.
Increasing shards further is making the Elasticsearch cluster to a Red status..and adding too many nodes to Elasticsearch cluster is also of no use , is there anyway I can tune the EMR and Elasticsearch with ideal params to get the job done in less than an hour for all the 300 million records ?.
Adding too many Shards in an index in Elasticsearch is giving this error
executor 62: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest ([HEAD] on [test-index] failed; server[endpoint.us-east-1.es.amazonaws.com:443] returned [429|Too Many Requests:]) [duplicate 2]
Adding too many nodes at spark is giving the below error
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: es_rejected_execution_exception: rejected execution of processing of [76498][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[test-index][7]] containing [3] requests, target allocation id: a02WQn91TdqUU7n3NSWufw, primary term: 1 on EsThreadPoolExecutor[name = 9e97ab8ff87aeecaf789b7203d64a894/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7ca1f1c1[Running, pool size = 2, active threads = 2, queued tasks = 200, completed tasks = 22084]]
{"index":{}}
Bailing out...
at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:519)
at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:113)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
this is my spark -submit command spark tuning params
--conf spark.default.parallelism=400 --conf spark.dynamicAllocation.enabled=true --conf maximizeResourceAllocation=true --conf spark.shuffle.service.enabled=true
How do I find the right balance between spark nodes and AWS Elasticsearch nodes ? , it looks like adding more nodes in Elasticserch gives more Cores and more cores is more write capacity , but it looks like scaling up the Amazon Elasticsearch cluster is not working.
am I doing anything wrong here ?