I have an application where I read csv files and do some transformations and then push them to elastic search from spark itself. Like this
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + type).save()
I have several nodes and in each node, I run 5-6 spark-submit
commands that push to elasticsearch
I am frequently getting Errors
Could not write all entries [13/128] (Maybe ES was overloaded?). Error sample (first [5] error messages):
rejected execution of org.elasticsearch.transport.TransportService$7@32e6f8f8 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@4448a084[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 451515]]
My Elasticsearch cluster has following stats -
Nodes - 9 (1TB space,
Ram >= 15GB ) More than 8 cores per node
I have modified following parameters for elasticseach
spark.es.batch.size.bytes=5000000
spark.es.batch.size.entries=5000
spark.es.batch.write.refresh=false
Could anyone suggest, What can I fix to get rid of these errors?