pythonelasticsearchelasticsearch-dslelasticsearch-painlessupdate-by-query

Update all elasticsearch docs using a dict for input using Python


Update All Documents

Background Information

I have a use case where I need to update all documents in my index. My source looks similar to the below:

{
  'hits': [
   {'_index': 'main-index-v2',
    '_type': '_doc',
    '_id': 'ID_xzeta4955029dhs82901',
    '_score': 8.403202,
    '_source': {'id': 'ID_xzeta4955029dhs82901',
        'employee_ids': ['J98234', 'J28373', 'CH13561', 'J98823', 'J12294'],
        'non_employee_ids': [],
        'friends_id': ['G8667', 'J98923', 'J28373', 'H82739', 'J98823'],
        'local_date': '2022/01/10',
        'local': True,
    ...
} 

I can easily search my index using the multi_match query, however this is for a single ID.

def create_multi_query(ids: str, fields: list=['employee_ids', 'non_employee_ids', 'friends_id']):
    return {
        "query": {
            "multi_match": {
                "query": f"{ids}",
                "fields": fields,
                "operator": "or"
            }
        }
    }

hits = es.search(index='main-index-v2', body=create_multi_query('G8667'), scroll='2m')

I want to provide a dictionary and list of fields as parameters to update my index.

Example:

{'J1234': 'J2875', 'CH1234': 'J2879'}

The dictionary contains old_ids to new_ids. I want to update every field that has old ids.

My Solution (Thus far)

I have written a painless script to update the ids, however it requires a for loop for each field. What the script does is loop through each field, one by one. If the current item in the list matches our parameter 'fromId' we append to a list the 'toId', otherwise add the current item to the list and move on. We then set the field equal to the new list.

Painless Script example

def result = [];
for (def item: ctx._source.employee_ids) 
    { 
        if (item == params.fromId) {
        result .add(params.toId)
    } 
    else {
        result .add(item)
    }} ctx._source.employee_ids= result; 

def resultF = [];
for (def item: ctx._source.friends_id) 
    { 
        if (item == params.fromId) {
        resultF .add(params.toId)
    } 
    else {
        resultF .add(item)
    }} ctx._source.friends_id = resultF ; 

This is able to be executed via UpdateByQuery within the elasticsearch_dsl library.

Example of the Update call.


def partial_update(es, items: dict):
    assert es.ping() is True
    tmp = []
    for from_id, to_id in items.items():
        result = execute_intermediate(from_id, to_id)
        tmp.append(result)
    return tmp

@retry((exceptions.ConflictError, exceptions.ConnectionError, exceptions.RequestError), value_type=dict, tries=3, delay=2, backoff=1)
def execute_intermediate(from_id, to_id):
    from elasticsearch_dsl  import UpdateByQuery
    ubq = UpdateByQuery(
        using=auth_es(),
        doc_type='doc', index=settings.ES_WRITE_INDEX,
    )
    ubq = ubq.script(source=UPDATE_SCRIPT, lang='painless', params={'fromId': from_id, 'toId': to_id})
    ubq = ubq.params(wait_for_completion=True)
    res = ubq.execute().to_dict()
    return res

Create an intermediate function to execute the update on the single ID, wrapping with a retry decorator.

Issues

  1. Doing it this way requires me to loop through my dictionary one by one to perform the update.

  2. If I want to increase the number of fields we want to update, I need to add a new for loop.

Questions

What is the best / most optimal solution to update all fields in source based on the above?

Is there a way to send a dictionary to find all the documents matching the keys, updating with the values in a single call?


Solution

  • There is no out-of-the-box solution for this.

    One improvement to the existing painless script is to change the array in place, while using a map in params accompanied by a list of fields to update.

    PUT /test_replace_id/
    {
      "mappings": {
        "properties": {
          "employee_ids":{
            "type": "keyword"
          }
        }
      }
    }
    
    POST /test_replace_id/_doc/1
    {
      "employee_ids": ["old1","old2"],
      "frieds_id": "old1"
    }
    
    POST /test_replace_id/_update/1
    {
      "script": {
        "source": """
          for (t in params.targets){
            if (ctx._source[t] instanceof List){
              for (int j=0; j<ctx._source[t].length; j++){
                if (params.map.containsKey(ctx._source[t][j])) {
                  ctx._source[t][j] = params.map.get(ctx._source[t][j])
                }
              }
            }else{
              if (params.map.containsKey(ctx._source[t])) {
                ctx._source[t] = params.map.get(ctx._source[t])
              }
            }
          }
        """,
        "params":{
          "targets": ["employee_ids","frieds_id"],
          "map": {"old1":"new1"}
        }
      }
    }
    GET /test_replace_id/_search
    

    This allows for greater flexibility, and not requiring to iterate and update. We can now send the entire request at once.

    @Tomo_M for the solution!