pythonjsonelasticsearchbulkinsertelasticsearch-py

Bulk index / create documents with elasticsearch for python


I am generating a large number of elasticsearch documents with random content using python and index them with elasticsearch-py.

Simplified working example (document with just one field):

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    es_client.index(index='my_index', document=document)

Since this makes one request per document, I tried to speed it up by sending chunks of 1000 documents each using the _bulk API. However, my attempts so far have been unsuccessful.

My understanding from the docs is that you can pass an iterable to bulk(), so I tried:

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    document_list.append(document)
    if i % 1000 == 0:
        es_client.bulk(operations=document_list, index='my_index')
        document_list = []

but this results in a

elasticsearch.BadRequestError: BadRequestError(400, 'illegal_argument_exception', 'Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]')


Solution

  • Ok, seems I have mixed up two different functions: helpers.bulk() and Elasticsearch.bulk(). Either can be used to achieve what I intended to do, but they have a slightly different signature.

    The helpers.bulk() function takes an Elasticsearch() object and an iterable containing the documents as parameters. The operation can be specified as _op_type and can be one of index, create, delete, or update. Since _op_type defaults to index, we can just omit it and simply pass the list of documents in this case:

    from elasticsearch import Elasticsearch, helpers
    from random import getrandbits
    
    es_client = Elasticsearch('https://elastic.host:9200')
    
    document_list = []
    for i in range(1,10000000):
        document = {'my_field': getrandbits(32)}
        document_list.append(document)
        if i % 1000 == 0:
            helpers.bulk(es_client, document_list, index='my_index')
            document_list = []
    

    This works fine.

    The Elasticsearch.bulk() function can be used alternatively, but the actions/operations are mandatory as part of the iterable here and the syntax is slightly different. This means that instead of just a dict with the document contents, we need to have a dict specifying both the action (in this case "index": {}), as well as the body for each document. See also _bulk documentation:

    from elasticsearch import Elasticsearch
    from random import getrandbits
    
    es_client = Elasticsearch('https://elastic.host:9200')
    
    actions_list = []
    for i in range(1,10000000):
        document = {'my_field': getrandbits(32)}
        actions_list.append({"index": {}, "doc": document})
        if i % 1000 == 0:
            es_client.bulk(operations=actions_list, index='my_index')
            actions_list = []
    

    This works fine as well.

    I assume that both of the above generate the same _bulk REST API statement internally, so they should be equivalent in the end.


    UPDATE:

    As pointed out by Johan, the helpers.bulk() function internally takes care of the chunking (it actually calls helpers.streaming_bulk() internally), so there is no need to manually assign action lists of size 1000 to it. For my final solution, I ended up writing a generator function that yields one document / action at a time anyway. This can then simply be passed directly to helpers.streaming_bulk(), along with a chunk_size of your choosing (the default value is 500):

    from elasticsearch import Elasticsearch, helpers                                
    from random import getrandbits                                                  
                                                                                    
    es_client = Elasticsearch('https://elastic.host:9200')
                                                                                    
    def doc_stream():                                                          
       ''' generator function for stream of actions '''                      
       for i in range(1,10000000):                                                  
           yield {'_index': 'my_index',                                   
                  '_source': {'my_field': getrandbits(32)} }                                                                                                                                                  
                                                                                    
    for status_ok, response in helpers.streaming_bulk(es_client,
                                              actions=doc_stream(), 
                                              chunk_size=1000):
        if not status_ok:                                                           
            # if failure inserting, log response                                                  
            print(response)