goelasticsearchbulkinsert

ES bulk request `es_rejected_execution_exception `


I have a slice of some 5M entries (for simplicity lets say each entry is a slice of bytes, which is mapped to indexer items using the getIndexerItem function), which I split equally between 200 go routines. Each go routine then calls the push function with the slice of length 5M/200.

Based on my understanding of Refresh: wait_for, whenever a request is made to elastic, it will complete only when the changes made by this request are visible to search (which IMO translates to the bulk request queue not having this particular request anymore). Then why am I getting this error?

error indexing item: es_rejected_execution_exception:
rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]: 
request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh,
target allocation id: someId, primary term: 1 on EsThreadPoolExecutor
[
    name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1
    [Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708]
]

All entries are going to the same index, ankit-test.

func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem {
    return esutil.BulkIndexerItem{
        Index:        index,
        DocumentID:   id,
        Body:         bytes.NewReader(body),
        Action:       "index",
        DocumentType: "logs",
        OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
            if err != nil {
                fmt.Printf("error indexing item: %s\n", err.Error())
            } else {
                fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason)
            }
        },
    }
}

func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) {
    indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
        Client: e.client,
        Refresh: "wait_for",
        NumWorkers: 1,
        OnError: func(ctx context.Context, err error) {
            fmt.Printf("received onError %s\n", err.Error())
        },
    })
    if err != nil {
        return nil, fmt.Errorf("error creating bulk indexer: %s", err)
    }

    ctx := context.Background()
    for _, d := range data {
        if err := indexer.Add(ctx, d); err != nil {
            fmt.Printf("error adding data to indexer: %s\n", err)
        }
    }
    if err := indexer.Close(ctx); err != nil {
        fmt.Printf("error flushing and closing indexer: %s\n", err)
    }

    indexerStats := indexer.Stats()
    return &indexerStats, nil
}

Assume no other process is interacting with the index in any way.


Solution

  • Using multiple ES docs, I was able to find a solution to the above problem. Answer below is based on my understanding. Please comment if you find something can be improved / corrected.


    Here is the request lifecycle:

    1. The golang es client combines multiple requests into one, and sends to server in form of a single bulk request. A single bulk request can contain documents destined for multiple indices and shards.
    2. When a bulk request arrives at a node (also called coordinating node) in the cluster, it is, in its entirety, put on the bulk queue, and processed by the threads in the bulk thread pool.
    3. Coordinating node splits the bulk request based on which shards the documents need to be routed to. Each bulk sub-request is forwarded to the data node that holds the corresponding primary shard. The bulk sub-request is enqueued on that node’s bulk queue. If no space is available on the queue, the coordinating node is notified that the bulk sub-request has been rejected.
    4. Once all sub-requests have completed or been rejected, a response is created and returned to the client. It is possible, and even likely, that only a portion of the documents within a bulk request might have been rejected.

    My issue was I was sending requests with refresh = false (default). Instead, refresh = wait_for should have been used. Why? There are 3 modes which are provided by refresh:

    1. false: Take no refresh related actions. The changes made by this request will be made visible at some point after the request returns. It's not necessary that the request has been completed by the time response is received. Request may still be in the node's queue.
    2. true: Refresh the relevant primary and replica shards immediately after the operation occurs. Ensures the request has been completed before response is sent back. Request has been removed from the node's queue.
    3. wait_for: Wait for the changes made by the request to be made visible by a refresh before replying. Unlike true, this doesn’t force an immediate refresh, rather, it waits for a refresh to happen. Less expensive than refresh = true (in terms of load on server), but still ensures the request has been completed before response is sent back. Request has been removed from the node's queue.

    All data was being redirected to the same node, and because of refresh = false, responses were returned before the existing requests were cleared from the queue, which was then causing overflows.