elasticsearchelasticsearch-aggregationelasticsearch-dsl-py

How to aggregate until a certain value is reached in ElasticSearch?


I would like to aggregate a list of documents (each of them has two fields - timestamp and amount) by "amount" field until a certain value is reached. For example I would like to get list of documents sorted by timestamp which total amount is equal to 100. Is it possible to do in one query?

Here is my query which returns total amount - I would like to add here a condition to stop aggregation when a certain value is reached.

{
"query": {
    "bool": {
        "filter": [
            {
                "range": {
                    "timestamp": {
                        "gte": 1525168583
                    }
                }
            }
        ]
    }
},
"aggs": {
    "total_amount": {
        "sum": {
            "field": "amount"
        }
    }
},
"sort": [
    "timestamp"
],
"size": 10000
}

Thank You


Solution

  • It's perfectly possible using a combination of function_score scripting for mimicking sorting, filter aggs for the range gte query and a healthy amount of scripted_metric aggs to limit the summation up to a certain amount.

    Let's first set up a mapping and ingest some docs:

    PUT summation
    {
      "mappings": {
        "properties": {
          "timestamp": {
            "type": "date",
            "format": "epoch_second"
          }
        }
      }
    }
    POST summation/_doc
    {
      "context": "newest",
      "timestamp": 1587049128,
      "amount": 20
    }
    
    POST summation/_doc
    {
      "context": "2nd newest",
      "timestamp": 1586049128,
      "amount": 30
    }
    
    POST summation/_doc
    {
      "context": "3rd newest",
      "timestamp": 1585049128,
      "amount": 40
    }
    
    POST summation/_doc
    {
      "context": "4th newest",
      "timestamp": 1585049128,
      "amount": 30
    }
    

    Then perform the query:

    GET summation/_search
    {
      "size": 0,
      "aggs": {
        "filtered_agg": {
          "filter": {
            "bool": {
              "must": [
                {
                  "range": {
                    "timestamp": {
                      "gte": 1585049128
                    }
                  }
                },
                {
                  "function_score": {
                    "query": {
                      "match_all": {}
                    },
                    "script_score": {
                      "script": {
                        "source": "return (params['now'] - doc['timestamp'].date.toMillis())",
                        "params": {
                          "now": 1587049676
                        }
                      }
                    }
                  }
                }
              ]
            }
          },
          "aggs": {
            "limited_sum": {
              "scripted_metric": {
                "init_script": """
                    state['my_hash'] = new HashMap();
                    state['my_hash'].put('sum', 0);
                    state['my_hash'].put('docs', new ArrayList());
                """,
                "map_script": """
                  if (state['my_hash']['sum'] <= 100) {
                    state['my_hash']['sum'] += doc['amount'].value;
                    state['my_hash']['docs'].add(doc['context.keyword'].value);
                  }
                """,
                "combine_script": "return state['my_hash']",
                "reduce_script": "return states[0]"
              }
            }
          }
        }
      }
    }
    

    yielding

    {
      "took" : 0,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 4,
          "relation" : "eq"
        },
        "max_score" : null,
        "hits" : [ ]
      },
      "aggregations" : {
        "filtered_agg" : {
          "meta" : { },
          "doc_count" : 4,
          "limited_sum" : {
            "value" : {
              "docs" : [
                "newest",
                "2nd newest",
                "3rd newest",
                "4th newest"
              ],
              "sum" : 120
            }
          }
        }
      }
    }
    

    I've chosen here to only return the doc.contexts but you can adjust it to retrieve whatever you like -- be it IDs, amounts etc.