3

I have an application where I read csv files and do some transformations and then push them to elastic search from spark itself. Like this

input.write.format("org.elasticsearch.spark.sql")
              .mode(SaveMode.Append)
              .option("es.resource", "{date}/" + type).save()

I have several nodes and in each node, I run 5-6 spark-submit commands that push to elasticsearch

I am frequently getting Errors

Could not write all entries [13/128] (Maybe ES was overloaded?). Error sample (first [5] error messages):
        rejected execution of org.elasticsearch.transport.TransportService$7@32e6f8f8 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@4448a084[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 451515]]

My Elasticsearch cluster has following stats -

Nodes - 9 (1TB space,
Ram >= 15GB ) More than 8 cores per node

I have modified following parameters for elasticseach

spark.es.batch.size.bytes=5000000
spark.es.batch.size.entries=5000
spark.es.batch.write.refresh=false

Could anyone suggest, What can I fix to get rid of these errors?

hard coder
  • 5,449
  • 6
  • 36
  • 61

2 Answers2

5

This occurs because the bulk requests are incoming at a rate greater than elasticsearch cluster could process and the bulk request queue is full.

The default bulk queue size is 200.

You should handle ideally this on the client side :

1) by reducing the number the spark-submit commands running concurrently

2) Retry in case of rejections by tweaking the es.batch.write.retry.count and es.batch.write.retry.wait

Example:

es.batch.write.retry.wait = "60s"
es.batch.write.retry.count = 6

On elasticsearch cluster side :

1) check if there are too many shards per index and try reducing it.
This blog has a good discussion on criteria for tuning the number of shards.

2) as a last resort increase the thread_pool.index.bulk.queue_size

Check this blog with an extensive discussion on bulk rejections.

Sam
  • 864
  • 2
  • 11
  • 21
keety
  • 17,231
  • 4
  • 51
  • 56
  • I have 90 Shards per index. Because I want to distribute data to each node (1TB storage, 9 nodes total) evenly. Otherwise some of nodes runs out of space while others have enough space. – hard coder Mar 28 '18 at 05:51
  • @Hardcoder makes sense but could you try a multiplicative factor less than 10 . Example : 9*5 = 45. This blog has a good discussion on tuning number of shards . Edited answer to include the blog link – keety Mar 28 '18 at 15:23
  • In my case when using spark we simply reduce the number of executors and repartition to deal with the ES speed insertion. Its a trick to avoid bulk insertion retries – Mathieu Barbot Nov 24 '20 at 15:51
0

The bulk queue in your ES cluster is hitting its capacity (200) . Try increasing it. See this page for how to change the bulk queue capacity.

https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html

Also check this other SO answer where OP had a very similar issue and was fixed by increasing the bulk pool size.

Rejected Execution of org.elasticsearch.transport.TransportService Error

jay
  • 2,067
  • 2
  • 16
  • 31