I have a slice of some 5M entries (for simplicity lets say each entry is a slice of bytes, which is mapped to indexer items using the getIndexerItem
function), which I split equally between 200 go routines. Each go routine then calls the push
function with the slice of length 5M/200.
Based on my understanding of Refresh: wait_for
, whenever a request is made to elastic, it will complete only when the changes made by this request are visible to search (which IMO translates to the bulk request queue not having this particular request anymore). Then why am I getting this error?
error indexing item: es_rejected_execution_exception:
rejected execution of processing of [358323543][indices:data/write/bulk[s][p]]:
request: BulkShardRequest [[ankit-test][3]] containing [3424] requests blocking until refresh,
target allocation id: someId, primary term: 1 on EsThreadPoolExecutor
[
name = machine_name/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@1f483ca1
[Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 44390708]
]
All entries are going to the same index, ankit-test
.
func (e *esClient) getIndexerItem(index string, id string, body []byte) esutil.BulkIndexerItem {
return esutil.BulkIndexerItem{
Index: index,
DocumentID: id,
Body: bytes.NewReader(body),
Action: "index",
DocumentType: "logs",
OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
fmt.Printf("error indexing item: %s\n", err.Error())
} else {
fmt.Printf("error indexing item: %s: %s\n", res.Error.Type, res.Error.Reason)
}
},
}
}
func (e *esClient) push(data []esutil.BulkIndexerItem) (*esutil.BulkIndexerStats, error) {
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: e.client,
Refresh: "wait_for",
NumWorkers: 1,
OnError: func(ctx context.Context, err error) {
fmt.Printf("received onError %s\n", err.Error())
},
})
if err != nil {
return nil, fmt.Errorf("error creating bulk indexer: %s", err)
}
ctx := context.Background()
for _, d := range data {
if err := indexer.Add(ctx, d); err != nil {
fmt.Printf("error adding data to indexer: %s\n", err)
}
}
if err := indexer.Close(ctx); err != nil {
fmt.Printf("error flushing and closing indexer: %s\n", err)
}
indexerStats := indexer.Stats()
return &indexerStats, nil
}
Assume no other process is interacting with the index in any way.
Using multiple ES docs, I was able to find a solution to the above problem. Answer below is based on my understanding. Please comment if you find something can be improved / corrected.
Here is the request lifecycle:
My issue was I was sending requests with refresh = false
(default). Instead, refresh = wait_for
should have been used. Why? There are 3 modes which are provided by refresh:
refresh = true
(in terms of load on server), but still ensures the request has been completed before response is sent back. Request has been removed from the node's queue.All data was being redirected to the same node, and because of refresh = false
, responses were returned before the existing requests were cleared from the queue, which was then causing overflows.