elasticsearchelasticsearch-aggregationaws-elasticsearch

Alternative solution to Cumulative Cardinality Aggregation in Elasticsearch


I'm running an Elasticsearch cluster that doesn't have access to x-packs on AWS, but I'd still like to do a cumulative cardinality aggregation to determine the daily counts of new users to my site.

Is there an alternate solution to this problem?

For example, how could I transform:

GET /user_hits/_search
{
  "size": 0,
  "aggs": {
    "users_per_day": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "day"
      },
      "aggs": {
        "distinct_users": {
          "cardinality": {
            "field": "user_id"
          }
        },
        "total_new_users": {
          "cumulative_cardinality": {
            "buckets_path": "distinct_users" 
          }
        }
      }
    }
  }
}

To produce the same result without cumulative_cardinality?


Solution

  • Cumulative cardinality was added precisely for that reason -- it wasn't easily calculable before...

    As with almost anything in ElasticSearch, though, there's a script to get it done for ya. Here's my take on it.

    1. Set up an index
    PUT user_hits
    {
      "mappings": {
        "properties": {
          "timestamp": {
            "type": "date",
            "format": "yyyy-MM-dd"
          },
          "user_id": {
            "type": "keyword"
          }
        }
      }
    }
    
    1. Add 1 new user in one day and 2 more the day after, one of which is not strictly 'new'.
    POST user_hits/_doc
    {"user_id":1,"timestamp":"2020-10-01"}
    
    POST user_hits/_doc
    {"user_id":1,"timestamp":"2020-10-02"}
    
    POST user_hits/_doc
    {"user_id":3,"timestamp":"2020-10-02"}
    
    1. Mock a date histogram using a parametrized start + number of day, group the users accordingly, and then compare the days' results vis-à-vis
    GET /user_hits/_search
    {
      "size": 0,
      "query": {
        "range": {
          "timestamp": {
            "gte": "2020-10-01"
          }
        }
      }, 
      "aggs": {
        "new_users_count_vs_prev_day": {
          "scripted_metric": {
            "init_script": """
              state.by_day_map = [:];
              state.start_millis = new SimpleDateFormat("yyyy-MM-dd").parse(params.start_date).getTime();
              state.day_millis = 24 * 60 * 60 * 1000;
              state.dt_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
            """,
            "map_script": """
              for (def step = 1; step < params.num_of_days + 1; step++) {
                def timestamp = doc.timestamp.value.millis;
                def user_id = doc['user_id'].value;
                def anchor = state.start_millis + (step * state.day_millis);
                // add a `n__` prefix to more easily sort the resulting map later on
                def anchor_pretty = step + '__' + state.dt_formatter.format(Instant.ofEpochMilli(anchor));
                
                if (timestamp <= anchor) {
                  if (state.by_day_map.containsKey(anchor_pretty)) {
                    state.by_day_map[anchor_pretty].add(user_id);
                  } else {
                    state.by_day_map[anchor_pretty] = [user_id];
                  }
                }
            }
            """,
            "combine_script": """
                List keys=new ArrayList(state.by_day_map.keySet());
                Collections.sort(keys);
              
                def unique_sorted_map = new TreeMap();
                def unique_from_prev_day = [];
                
                for (def key : keys) { 
                  def unique_users_per_day = new HashSet(state.by_day_map.get(key));
                  
                  unique_users_per_day.removeIf(user -> unique_from_prev_day.contains(user));
                  
                   // remove the `n__` prefix
                   unique_sorted_map.put(key.substring(3), unique_users_per_day.size());
                   unique_from_prev_day.addAll(unique_users_per_day);
                }
                return unique_sorted_map
            """,
            "reduce_script": "return states",
            "params": {
              "start_date": "2020-10-01",
              "num_of_days": 5
            }
          }
        }
      }
    }
    

    yielding

    "aggregations" : {
      "new_users_count_vs_prev_day" : {
        "value" : [
          {
            "2020-10-01" : 1,    <-- 1 new unique user            
            "2020-10-02" : 1,    <-- another new unique user
            "2020-10-03" : 0,
            "2020-10-04" : 0,
            "2020-10-05" : 0
          }
        ]
      }
    }
    

    The script is guaranteed to be slow but has one, potentially quite useful, advantage -- you can adjust it to return the full list of new user IDs, not just the count that you'd get from the cumulative cardinality which, according to its implementation's author, only works in a sequential, cumulative manner by design.