2

I have my input file size which is around 300 million records to store to Amazon Elastic search, I am using Amazon EMR ( Spark ) to load to Amazon Elastic search.

My EMR configuration is master - R5.4xlarge ( 16 vCore, 128 GiB memory ) and Slaves R5.8xlarge ( 10 nodes , 32 vCore each, 256 GiB memory on each node) .

MY ES configuration is Instance type (data)i3.large.elasticsearch Number of nodes 5 , Number of master instances- 3

the input that I am reading is from s3 , this is the code I have

 SparkConf sparkConf = (new SparkConf())
              .setAppName("SparkLoadToES");
          sparkConf.set("spark.es.nodes", );
          sparkConf.set("spark.es.port", );
          sparkConf.set("spark.es.nodes.wan.only","true");
          sparkConf.set("spark.es.batch.write.refresh","false");
          sparkConf.set("spark.es.index.auto.create","true");
          sparkConf.set("spark.es.resource","test-index");
          sparkConf.set("spark.es.batch.size.entries","25000");
          sparkConf.set("spark.es.batch.write.retry.wait","30s");
          sparkConf.set("spark.es.batch.write.retry.count","-1");
          sparkConf.set("spark.es.batch.size.bytes","15mb");


 SparkSession sparkSession = SparkSession
              .builder()
              .config(sparkConf)
              .getOrCreate();

Dataset<Row> datasetToLoad = sparkSession.read().parquet("s3a://databucket/temp/").repartition(320);
 datasetToLoad.cache();

datasetToLoad.write().format("org.elasticsearch.spark.sql").mode(SaveMode.Append).save("test-index");

The data load is very slow when writing to Elastic search , it is taking 2 hrs to load 200 million , I wanted to get this down to around 30 to 40 mins to load 300 million,

number of shards that I have on my Amazon Elasticsearch index is 9 and 0 replica's.

I wanted to get the job done writing to Elasticsearch in 30 mins , but it is very slow , it clearly looks like there is some bottle neck at the Elasticsearch side.

Increasing shards further is making the Elasticsearch cluster to a Red status..and adding too many nodes to Elasticsearch cluster is also of no use , is there anyway I can tune the EMR and Elasticsearch with ideal params to get the job done in less than an hour for all the 300 million records ?.

Adding too many Shards in an index in Elasticsearch is giving this error

executor 62: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest ([HEAD] on [test-index] failed; server[endpoint.us-east-1.es.amazonaws.com:443] returned [429|Too Many Requests:]) [duplicate 2]

Adding too many nodes at spark is giving the below error

org.elasticsearch.hadoop.rest.EsHadoopRemoteException: es_rejected_execution_exception: rejected execution of processing of [76498][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[test-index][7]] containing [3] requests, target allocation id: a02WQn91TdqUU7n3NSWufw, primary term: 1 on EsThreadPoolExecutor[name = 9e97ab8ff87aeecaf789b7203d64a894/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7ca1f1c1[Running, pool size = 2, active threads = 2, queued tasks = 200, completed tasks = 22084]]
    {"index":{}}

Bailing out...
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:519)
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:113)
    at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)
    at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
    at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

this is my spark -submit command spark tuning params

--conf spark.default.parallelism=400 --conf spark.dynamicAllocation.enabled=true --conf maximizeResourceAllocation=true --conf spark.shuffle.service.enabled=true

How do I find the right balance between spark nodes and AWS Elasticsearch nodes ? , it looks like adding more nodes in Elasticserch gives more Cores and more cores is more write capacity , but it looks like scaling up the Amazon Elasticsearch cluster is not working.

am I doing anything wrong here ?

  • Try lowering your parallelism (or pre partition the data) to the number of shards. If you have 5 shard, then you really only need 5 partitions actively writing. Then try to keep the number of shards equal to the number of CPU cores in your ES cluster – Andrew White May 18 '20 at 00:59
  • I have 8 nodes in Elastic Search , each node has 2 CPU cores you mean increasing the shards to 8*2 = 16 shards ? –  May 18 '20 at 01:16
  • I am getting the below error when changed to 16 shards and with only 16 partitions in spark org.elasticsearch.hadoop.rest.EsHadoopRemoteException: es_rejected_execution_exception: rejected execution of processing of [208777][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[test-index][3]] containing [4] requests, target allocation id: DT27vS7DSKuRBO8d4WmOwA, primary term: 1 on EsThreadPoolExecutor[name = c232693ac4dccb10c93d159687d32019/write, queue capacity = 200, –  May 18 '20 at 01:18
  • org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@42ecd85b[Running, pool size = 2, active threads = 2, queued tasks = 200, completed tasks = 57864]] {"index":{}} –  May 18 '20 at 01:18
  • I'm not super familiar with the org.elasticsearch.spark.sql format so my guess is that it's doing something funny. In short, it looks like Spark is DoSing your ES cluster with too many parallel writes and you need to find a way to tell Spark to scale back. – Andrew White May 18 '20 at 01:20
  • What are you using for storage? The NVME SSD disk of the instance or an EBS volume? Have you tried to change the instance type just during the first load? What is your average document size? You are talking about indexing 300 million documents in 30 minutes, this is around 165k events per second, this seems pretty high for your instance type. The error `es_rejected_execution_exception` is an indication that your cluster can not index as fast as it is receiving data. – leandrojmp May 18 '20 at 01:31
  • @leandrojmp I am using NVMe SSD type ..the average document size is ~493 Bytes I am using NVME SSD types with 8 nodes ( 2 VCPU cores ) and the 3 master nodes –  May 18 '20 at 01:50
  • How many data nodes do you have? Try to set the number of shards equals to the number of data nodes, without any replicas in the first load, but still, you are getting 200 million documents in 2 hours, this gives you an indexing rate around 27k e/s, you are aiming to get a indexing rate of around 165 k e/s, 6 times higher, I don't think your current cluster can handle that. Have you read this [official document](https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-indexing-speed.html)? Try to follow those tips to see if it improves something. – leandrojmp May 18 '20 at 02:19
  • @ leandrojmp have 8 data nodes in elastic search cluster , I have tried setting the shards to 8 and replicas to 0 initially and that gave me a rate of 200 million docs in 2.1 hrs , I have read the official document and as well as https://aws.amazon.com/premiumsupport/knowledge-center/elasticsearch-indexing-performance/ , tried following everything still no performance..I am okay if I can atleast get it down to somewhere near 1 hr 30 mins for 300 million records. –  May 18 '20 at 02:34

0 Answers0