elasticsearchelasticsearch-opendistroopensearch

Adding a new document to a separate index using Elasticsearch processors


Is there a way to populate a separate index when I index some document(s)?

Let's assume I have something like:

PUT person/_doc/1
{
  "name": "Jonh Doe",
  "languages": ["english", "spanish"]
}

PUT person/_doc/2
{
  "name": "Jane Doe",
  "languages": ["english", "russian"]
}

What I want is that every time a person is added, a language is added to a language index.

Something like:

GET languages/_search

would give:

...
"hits" : [
  {
    "_index" : "languages",
    "_type" : "doc",
    "_id" : "russian",
    "_score" : 1.0,
    "_source" : {
      "value" : "russian"
    }
  },
  {
    "_index" : "languages",
    "_type" : "doc",
    "_id" : "english",
    "_score" : 1.0,
    "_source" : {
      "value" : "english"
    }
  },
  {
    "_index" : "languages",
    "_type" : "doc",
    "_id" : "spanish",
    "_score" : 1.0,
    "_source" : {
      "value" : "spanish"
    }
  }
...

Thinking of pipelines, but I don't see any processor that allow such a thing.

Maybe the answer is to create a custom processor. I have one already, but not sure how could I insert a document in a separate index there.


Update: Use transforms as described in @Val answer works, and seems to be the right answer indeed...

However, I am using Open Distro for Elasticsearch and transforms are not available there. Some alternative solution that works there would be greatly appreciated :)


Update 2: Looks like OpenSearch is replacing Open Distro for Elasticsearch. And there is a transform api \o/


Solution

  • Each document entering an ingest pipeline cannot be cloned or split like it is doable in Logstash for instance. So from a single document, you cannot index two documents.

    However, just after indexing your person documents, it's definitely possible to hit the _transform API endpoint and create the languages index from the person one:

    First create the transform:

    PUT _transform/languages-transform
    {
      "source": {
        "index": "person"
      },
      "pivot": {
        "group_by": {
          "language": {
            "terms": {
              "field": "languages.keyword"
            }
          }
        },
        "aggregations": {
          "count": {
            "value_count": {
              "field": "languages.keyword"
            }
          }
        }
      },
      "dest": {
        "index": "languages",
        "pipeline": "set-id"
      }
    }
    

    You also need to create the pipeline that will set the proper ID for your language documents:

    PUT _ingest/pipeline/set-id
    {
      "processors": [
        {
          "set": {
            "field": "_id",
            "value": "{{language}}"
          }
        }
      ]
    }
    

    Then, you can start the transform:

    POST _transform/languages-transform/_start
    

    And when it's done you'll have a new index called languages whose content is

    GET languages/_search
    =>
    "hits" : [
      {
        "_index" : "languages",
        "_type" : "_doc",
        "_id" : "english",
        "_score" : 1.0,
        "_source" : {
          "count" : 4,
          "language" : "english"
        }
      },
      {
        "_index" : "languages",
        "_type" : "_doc",
        "_id" : "russian",
        "_score" : 1.0,
        "_source" : {
          "count" : 2,
          "language" : "russian"
        }
      },
      {
        "_index" : "languages",
        "_type" : "_doc",
        "_id" : "spanish",
        "_score" : 1.0,
        "_source" : {
          "count" : 2,
          "language" : "spanish"
        }
      }
    ]
    

    Note that you can also set that transform on schedule so that it runs regularly, or you can run it manually whenever suits you, to rebuild the languages index.


    OpenSearch has its own _transform API. It works slightly different, the transform could be created this way:

    PUT _plugins/_transform/languages-transform
    {
      "transform": {
        "enabled": true,
        "description": "Insert languages",
        "schedule": {
          "interval": {
            "period": 1,
            "unit": "minutes"
          }
        },
        "source_index": "person",
        "target_index": "languages",
        "data_selection_query": {
          "match_all": {}
        },
        "page_size": 1,
        "groups": [{
          "terms": {
            "source_field": "languages.keyword",
            "target_field": "value"
          }
        }]
      }
    }