apache-sparkelasticsearchapache-spark-sqlelasticsearch-spark

Elastic search could not write all entries: May be es was overloaded


I have an application where I read csv files and do some transformations and then push them to elastic search from spark itself. Like this

input.write.format("org.elasticsearch.spark.sql")
              .mode(SaveMode.Append)
              .option("es.resource", "{date}/" + type).save()

I have several nodes and in each node, I run 5-6 spark-submit commands that push to elasticsearch

I am frequently getting Errors

Could not write all entries [13/128] (Maybe ES was overloaded?). Error sample (first [5] error messages):
        rejected execution of org.elasticsearch.transport.TransportService$7@32e6f8f8 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@4448a084[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 451515]]

My Elasticsearch cluster has following stats -

Nodes - 9 (1TB space,
Ram >= 15GB ) More than 8 cores per node

I have modified following parameters for elasticseach

spark.es.batch.size.bytes=5000000
spark.es.batch.size.entries=5000
spark.es.batch.write.refresh=false

Could anyone suggest, What can I fix to get rid of these errors?


Solution

  • This occurs because the bulk requests are incoming at a rate greater than elasticsearch cluster could process and the bulk request queue is full.

    The default bulk queue size is 200.

    You should handle ideally this on the client side :

    1) by reducing the number the spark-submit commands running concurrently

    2) Retry in case of rejections by tweaking the es.batch.write.retry.count and es.batch.write.retry.wait

    Example:

    es.batch.write.retry.wait = "60s"
    es.batch.write.retry.count = 6
    

    On elasticsearch cluster side :

    1) check if there are too many shards per index and try reducing it.
    This blog has a good discussion on criteria for tuning the number of shards.

    2) as a last resort increase the thread_pool.index.bulk.queue_size

    Check this blog with an extensive discussion on bulk rejections.