groovyelasticsearchpyes

How to use Elasticsearch scripting to partial update 500 Million documents as fast as possible


I maintain an index with roughly 500 million documents. Amongst others, each document has a string field that contains between 1 to 10 words. I'd like to analyze this field in each document with regard to its word count and store the result to the respective document into a field "wordCount".

I know that there is the partial_update functionality found here: ES documentation to partial_update

I wonder if a scripted partial_update (maybe with an advanced Groovy script) can be used to significantly increase speed of the above task. And if so, can someone give a hint how to start?

Currently, I am using the below python script, but it's slow as hell (in terms of big data, due to the many network roundtrips and payload sizes)

#!/usr/bin/env python
#-*- coding: utf-8 -*-

import elasticsearch
from elasticsearch import helpers
import pyes
from unidecode import unidecode
from datetime import datetime


def getKeywordLength(text):
    text = text.strip()
    return text.count(" ")+1

indices = ["corpus"]

uri2 = "%s:%d" % ("http://localhost", 9200)
connection2 = pyes.ES([uri2], timeout=2000000)
es = elasticsearch.Elasticsearch(timeout=2000000)

def start():
    elasticSearchIndexName = index

    ###build search query to iterate over all records
    squery ='{"sort": [{"timestampUpdated": {"order": "asc","ignore_unmapped": true}}],"query": {"filtered": {"query": {"bool": {"should": [{"query_string": {"query": "*"}}]}}}}}'

    ###fetch a scrolling handle over all records
    items = helpers.scan(es,query=squery.encode('utf8'),index=elasticSearchIndexName,scroll='360s', size='1000', timeout=2000000)

    ###iterate over all records
    for i in items:
        try:
            indexName = i["_index"]
            timestamp = datetime.now().isoformat()
            keyword = i["_source"]["keyword"]
            i["_source"]["keywordLength"] = getKeywordLength(keyword)
            i["_source"]["timestampUpdated"] =  timestamp
            result = connection2.index(i["_source"], indexName, "items", id=i['_id'])
            print result
        except:
            start()
            return
start()

Solution

  • What I usually do when I have plenty of data to bulk update on millions of documents and can't afford the roundtrip is using the update-by-query plugin. The principle is dead simple, it allows you to run a query with the query DSL and on all the matching documents, run a script to do whatever you like.

    In your case, it would go like this:

    curl -XPOST localhost:9200/corpus/update_by_query -d '{
        "query": {
            "match_all": {}
        }, 
        "script": "ctx._source.keywordLength = ctx._source.keyword.split(\" \").size() + 1; ctx._source.timestampUpdated = new Date().format(\"yyyy-MM-dd\");"
    }'
    

    Also note that in order to be able to run this, you need to enable scripting in your elasticsearch.yml file:

    # before ES 1.6
    script.disable_dynamic: false
    
    # since ES 1.6
    script.inline: on