elasticsearchelastic-stackelasticsearch-8

Merge two indices value by a common field in elasticsearch


PUT _enrich/policy/merge
{
  "match": {
    "indices": "es.event-154.23-09-01-6-27-22",
    "match_field": "ancestor_id",
    "enrich_fields":  ["status","status_id","type","sub_type","primary_assigned","vehicle_of_interest","dms_ro_number","dms_deal_number","primary_assigned_user_name","dealership_id", "ancestor_id","event_id","deleted","service_date","update_date","secondary_assigned_user_name","bdc_assigned_user_name"]
  }
}

PUT _ingest/pipeline/enrich
{
  "processors": [
    {
      "enrich": {
        "description": "Add Events to customer",
        "policy_name": "merge",
        "field": "contact_id",
        "target_field": "events",
        "max_matches": "1"
        
        
      }
    }
  ]
}

POST _reindex?wait_for_completion=false
{
  "source": {
    "index": "es.customer-154.24-08-09-6-27-22"
  },
  "dest": {
    "index": "es.customerevent1-154.25-08-09-6-27-22",
    "pipeline": "enrich"
  }
}

Here, I am trying to add some values to the indices. If it is more matches there then which record should it take? . It looks like it took a random record by match field. Is possible to mention to match with specific order asc/desc by a field? Please help me to fix this.


Solution

  • Great progress so far!!

    One way to achieve this is to store all matches in an array (here 10 matches stored in tmpArray and then pick the one that suits you best using a script processor (here tmpArray is sorted by someFieldand we store the first one in the event field), like this:

    PUT _ingest/pipeline/enrich
    {
      "processors": [
        {
          "enrich": {
            "description": "Add Events to customer",
            "policy_name": "merge",
            "field": "contact_id",
            "target_field": "tmpArray",
            "max_matches": "10"
            
            
          }
        },
        {
          "script": {
            "if": "ctx.tmpArray != null",
            "source": """
            Integer sortByEventId(Map o1, Map o2) {
              return o1.event_id != null && o2?.event_id != null ? o2.event_id.compareTo(o1.event_id) : 0;
            }
            ctx.events = ctx.tmpArray.stream()
              .sorted(this::sortByEventId)
              .limit(5)
              .collect(Collectors.toList());
    
            ctx.remove('tmpArray');
            """
          }
        }
      ]
    }