I'm running an Elasticsearch cluster that doesn't have access to x-packs
on AWS, but I'd still like to do a cumulative cardinality aggregation
to determine the daily counts of new users to my site.
Is there an alternate solution to this problem?
For example, how could I transform:
GET /user_hits/_search
{
"size": 0,
"aggs": {
"users_per_day": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "day"
},
"aggs": {
"distinct_users": {
"cardinality": {
"field": "user_id"
}
},
"total_new_users": {
"cumulative_cardinality": {
"buckets_path": "distinct_users"
}
}
}
}
}
}
To produce the same result without cumulative_cardinality
?
Cumulative cardinality was added precisely for that reason -- it wasn't easily calculable before...
As with almost anything in ElasticSearch, though, there's a script to get it done for ya. Here's my take on it.
PUT user_hits
{
"mappings": {
"properties": {
"timestamp": {
"type": "date",
"format": "yyyy-MM-dd"
},
"user_id": {
"type": "keyword"
}
}
}
}
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-01"}
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-02"}
POST user_hits/_doc
{"user_id":3,"timestamp":"2020-10-02"}
GET /user_hits/_search
{
"size": 0,
"query": {
"range": {
"timestamp": {
"gte": "2020-10-01"
}
}
},
"aggs": {
"new_users_count_vs_prev_day": {
"scripted_metric": {
"init_script": """
state.by_day_map = [:];
state.start_millis = new SimpleDateFormat("yyyy-MM-dd").parse(params.start_date).getTime();
state.day_millis = 24 * 60 * 60 * 1000;
state.dt_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
""",
"map_script": """
for (def step = 1; step < params.num_of_days + 1; step++) {
def timestamp = doc.timestamp.value.millis;
def user_id = doc['user_id'].value;
def anchor = state.start_millis + (step * state.day_millis);
// add a `n__` prefix to more easily sort the resulting map later on
def anchor_pretty = step + '__' + state.dt_formatter.format(Instant.ofEpochMilli(anchor));
if (timestamp <= anchor) {
if (state.by_day_map.containsKey(anchor_pretty)) {
state.by_day_map[anchor_pretty].add(user_id);
} else {
state.by_day_map[anchor_pretty] = [user_id];
}
}
}
""",
"combine_script": """
List keys=new ArrayList(state.by_day_map.keySet());
Collections.sort(keys);
def unique_sorted_map = new TreeMap();
def unique_from_prev_day = [];
for (def key : keys) {
def unique_users_per_day = new HashSet(state.by_day_map.get(key));
unique_users_per_day.removeIf(user -> unique_from_prev_day.contains(user));
// remove the `n__` prefix
unique_sorted_map.put(key.substring(3), unique_users_per_day.size());
unique_from_prev_day.addAll(unique_users_per_day);
}
return unique_sorted_map
""",
"reduce_script": "return states",
"params": {
"start_date": "2020-10-01",
"num_of_days": 5
}
}
}
}
}
yielding
"aggregations" : {
"new_users_count_vs_prev_day" : {
"value" : [
{
"2020-10-01" : 1, <-- 1 new unique user
"2020-10-02" : 1, <-- another new unique user
"2020-10-03" : 0,
"2020-10-04" : 0,
"2020-10-05" : 0
}
]
}
}
The script is guaranteed to be slow but has one, potentially quite useful, advantage -- you can adjust it to return the full list of new user IDs, not just the count that you'd get from the cumulative cardinality which, according to its implementation's author, only works in a sequential, cumulative manner by design.