elasticsearchrustparallel-processingthrottlingbulk

Throttle Elasticsearch server which is seemingly being overwhelmed by bulk postings?


New to Rust and am converting a Python app which takes several hundred MS Word documents, parses them for all text, and divides up the text of each Word document into an overlapping sequence of 10-line Lucene Documents ("LDocs"), which are then added to the index using the _bulk end point.

The exciting opportunity which Rust gives is to do proper parallelism, and I naively thought I had done this successfully: for each Word document, in its own thread, I build a bulk string in the specific ES bulk string format and then, at the end of the Word document, submit (this ES 8.6.2 server is running on Port 9500):

reqwest_client
.post("https://localhost:9500/my_index/_bulk")
.header("Content-type", "application/x-ndjson")
.body(self.bulk_post_str.to_string())
.basic_auth("mike12", Some("mike12"))
.send()
.await?
.text()
.await?;

... everything seemed to be working fine ... until I examined the number of "LDocs" supposedly submitted and the number found after the process ends using the _count end point: typically 29294 submitted, but only 18638 actually made it into the index!

So I decided to examine the reality of what was happening with those Rust ?s in the code above. The typical error (at ... send().await) is "sending request for url (https://localhost:9500/my_index/_bulk): dispatch task is gone: runtime dropped the dispatch task".

From what I've read so far, it appears that the culprit here is the ES server. But how to detect a rejected bulk post? I've examined the string output from the second await above and this is saying errors: false.

What I thought of doing
So my thought was "if a bulk post fails, try, try and try again". So I changed to this:

    let mut n_loop = 0;
    loop {
        n_loop += 1;
        // corrected on the suggestion of kmdreko:
        // std::thread::sleep(std::time::Duration::from_millis(1));
        let _ = tokio::time::sleep(std::time::Duration::from_millis(1));
        if stop_detected() {
            return Ok(())
        }
        let url = format!("https://localhost:9500/{}/_bulk", INDEX_NAME.read().unwrap());
        let send_result = reqwest_client
        .post(url)
        .header("Content-type", "application/x-ndjson")
        .body(self.bulk_post_str.to_string())
        .basic_auth("mike12", Some("mike12"))
        .send()
        .await;
        let send_value = match send_result {
            Ok(send_value) => send_value,
            Err(e) => {
                warn!("attempt {n_loop} SEND post_bulk_string error was {}", e);
                continue
            },
        };
        let text_result = send_value.text().await;
        let _text_value = match text_result {
            Ok(text_value) => text_value,
            Err(e) => {
                warn!("attempt {n_loop} TEXT post_bulk_string error was {}", e);
                continue
            },
        };
        
        // TODO check that "errors" here is "false"
        // info!(">> text_value |{}...|", &text_value[0..100]);
        break
    }
    Ok(())

... now I find that I get wildly inconsistent results: sometimes the ES server seems to foul up completely, I get up to 6 attempts to post the bulk string, and I get fewer "LDocs" than there should be.

Most of the time, though, I'm now finding that I'm getting MORE "LDocs" in the index than there should be: 43062 when I run the _count end point for 29294 counted during the parsing. So I'm beginning to wonder whether some of these send().await commands are in fact working (partially?) though nevertheless raising an Err.

Is there a recommended way to detect problems with the ES server and then implement some sort of throttling mechanism when _bulk requests are being made from multiple threads?

And/or, for that matter, is there a way to increase the ES server's ability to accept and process incoming bulk posts? At the moment, for example, I think I've only got one "shard". I don't really understand much about ES and shards, as I have only ever used ES on my local machine (OS = W10). ==> "Single node" (?).

Code which calls the above

let rt = Runtime::new().unwrap();
rt.block_on(async move {
    if stop_detected() {
        return
    }
    match docx_document.post_bulk_string(&reqwest_client_clone).await {
        Ok(()) => (),
        Err(e) => {
            warn!("post_bulk_string error was {}", e);
            ()
        }
    }
});

Each parallel thread is thus creating a new Runtime. I have no idea what other options there may be or what the problems may be with doing it this way.


Solution

  • The errors and problems here were largely due to the fact that I had not realised that there is a blocking version of reqwests::Client, i.e. reqwests::blocking::Client. My code absolutely did not need to be bothering* with asynchronous handling, await and Futures. The solution is to add the blocking feature to reqwests in Cargo.toml.

    For my several hundred Word documents, the calculated figure of "LDocs" (Lucene Documents) generated was 29294. Now I am using a non-async solution the figure when I use the _count end point on the ES index produced is pretty consistently 29564, which suggests there may be something I need to tweak with my counting method during the parsing.

    Assuming the process ends correctly, sometimes the _count end point delivers a higher figure than that, and occasionally a lower figure. At the moment I'm not quite sure whether there are more things I may need to do to ensure the ES server consistently takes on the right number of LDocs, no more and no fewer, and will continue to monitor things.

    Later
    Firstly, it is probably a good idea to use a thread pool with no more than the number of virtual cores in your machine. You can get this in Rust as follows:

    let n_virtual_cores = std::thread::available_parallelism().unwrap().get();
    

    Setting a larger number for the thread pool (or using unlimited spawned individual threads) seems to cause more timeouts. This is logical enough: however the ES server works, there's no way it can handle more than n_virtual_cores threads simultaneously.

    Secondly, if a timeout occurs with the send method it often seems to be the case that the bulk POST has in fact already occurred. So repeating the bulk POST would not be the right thing to do as it would result in duplicate LDocs. In such a case, deleting the LDocs likely to have been added and then repeating the bulk POST would appear to be necessary, but that would be very complicated. Better to set a generous timeout, maybe 5 seconds (my bulk strings here are enormous).

    Thirdly, back in the Python calling this Rust module, it seems to be the case that even after the Rust module has returned you have to wait a significant time, maybe 2 seconds, before the ES server has been able to "digest" these bulk POSTs. If you don't, you get the wrong answer from end point _count. Maybe there's some way you can find out whether the ES server is "pending digestion" from the server itself...


    * of course the problem with using a blocking method is that you can't then shut down your thread. reqwest::blocking::ClientBuilder has a method timeout, which says that the Duration defaults to 30 s. For my ES running on my local machine I've set this to be shorter.