I am generating a large number of elasticsearch documents with random content using python and index them with elasticsearch-py.
Simplified working example (document with just one field):
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
es_client.index(index='my_index', document=document)
Since this makes one request per document, I tried to speed it up by sending chunks of 1000 documents each using the _bulk
API. However, my attempts so far have been unsuccessful.
My understanding from the docs is that you can pass an iterable to bulk()
, so I tried:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
es_client.bulk(operations=document_list, index='my_index')
document_list = []
but this results in a
elasticsearch.BadRequestError: BadRequestError(400, 'illegal_argument_exception', 'Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]')
Ok, seems I have mixed up two different functions: helpers.bulk()
and Elasticsearch.bulk()
. Either can be used to achieve what I intended to do, but they have a slightly different signature.
The helpers.bulk()
function takes an Elasticsearch()
object and an iterable containing the documents as parameters. The operation can be specified as _op_type
and can be one of index
, create
, delete
, or update
. Since _op_type
defaults to index
, we can just omit it and simply pass the list of documents in this case:
from elasticsearch import Elasticsearch, helpers
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(32)}
document_list.append(document)
if i % 1000 == 0:
helpers.bulk(es_client, document_list, index='my_index')
document_list = []
This works fine.
The Elasticsearch.bulk()
function can be used alternatively, but the actions/operations are mandatory as part of the iterable here and the syntax is slightly different. This means that instead of just a dict
with the document contents, we need to have a dict
specifying both the action (in this case "index": {}
), as well as the body for each document. See also _bulk
documentation:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
actions_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(32)}
actions_list.append({"index": {}, "doc": document})
if i % 1000 == 0:
es_client.bulk(operations=actions_list, index='my_index')
actions_list = []
This works fine as well.
I assume that both of the above generate the same _bulk
REST API statement internally, so they should be equivalent in the end.
UPDATE:
As pointed out by Johan, the helpers.bulk()
function internally takes care of the chunking (it actually calls helpers.streaming_bulk()
internally), so there is no need to manually assign action lists of size 1000 to it. For my final solution, I ended up writing a generator function that yields one document / action at a time anyway. This can then simply be passed directly to helpers.streaming_bulk()
, along with a chunk_size
of your choosing (the default value is 500):
from elasticsearch import Elasticsearch, helpers
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
def doc_stream():
''' generator function for stream of actions '''
for i in range(1,10000000):
yield {'_index': 'my_index',
'_source': {'my_field': getrandbits(32)} }
for status_ok, response in helpers.streaming_bulk(es_client,
actions=doc_stream(),
chunk_size=1000):
if not status_ok:
# if failure inserting, log response
print(response)