elasticsearchelasticsearch-aggregationelasticsearch-query

ElasticSearch mapping the result of collapse / do operations on a grouped documents


There is a list of conversations and every conversation has a list of messages. Every message has different fields and an action field. We need to consider that in the first messages of the conversation there is used the action A, after a few messages there is used action A.1 and after a while A.1.1 and so on (there is a list of chatbot intents).

Grouping the messages actions of a conversation will be something like: A > A > A > A.1 > A > A.1 > A.1.1 ...

Problem:

I need to create a report using ElasticSearch that will return the actions group of every conversation; next, I need to group the similar actions groups adding a count; in the end will result in a Map<actionsGroup, count> as 'A > A.1 > A > A.1 > A.1.1', 3.

Constructing the actions group I need to eliminate every group of duplicates; Instead of A > A > A > A.1 > A > A.1 > A.1.1 I need to have A > A.1 > A > A.1 > A.1.1.

Steps I started to do:

{
   "collapse":{
      "field":"context.conversationId",
      "inner_hits":{
         "name":"logs",
         "size": 10000,
         "sort":[
            {
               "@timestamp":"asc"
            }
         ]
      }
   },
   "aggs":{
   },
}

What I need next:

  1. I need to map the result of the collapse in a single result like A > A.1 > A > A.1 > A.1.1. I've seen that in the case or aggr is possible to use scripts over the result and there is possible to create a list of actions like I need to have, but aggr is doing the operations over all messages, not only over the grouped messages that I have in collapse. It is there possible to use aggr inside collapse or a similar solution?
  2. I need to group the resulted values(A > A.1 > A > A.1 > A.1.1) from all collapses, adding a count and resulting in the Map<actionsGroup, count>.

Or:

  1. Group the conversations messages by conversationId field using aggr (I don't know how can I do this)
  2. Use script to iterate all values and create the actions group for every conversation. (not sure if this is possible)
  3. Use another aggr over all values and group the duplicates, returning Map<actionsGroup, count>.

Mappings:

"mappings":{
  "properties":{
     "@timestamp":{
        "type":"date",
        "format": "epoch_millis"
     }
     "context":{
        "properties":{
           "action":{
              "type":"keyword"
           },
           "conversationId":{
              "type":"keyword"
           }
        }
     }
  }
}

Sample documents of the conversations:

Conversation 1.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id1",
    }
}

Conversation 2.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id2",
    }
}

Conversation 3.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "B",
        "conversationId": "conv_id3",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "B.1",
        "conversationId": "conv_id3",
    }
}

Expected result:

{
    "A -> A.1 -> A.1.1": 2,
    "B -> B.1": 1
}
Something similar, having this or any other format.

Solution

  • I solved it using the scripted_metric of elastic. Also, the index was changed from the initial state.

    The script:

    {
       "size": 0,
       "aggs": {
            "intentPathsCountAgg": {
                "scripted_metric": {
                    "init_script": "state.messagesList = new ArrayList();",
                    "map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",  
                    "combine_script": "return state",
                    "reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
                }
            }
        }
    }
    

    Formatted script (for better readability - using .ts):

    scripted_metric: {
      init_script: 'state.messagesList = new ArrayList();',
      map_script: `
        long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
        Map currentMessage = [
          'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
          'time': currentMessageTime,
          'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
        ];
        state.messagesList.add(currentMessage);`,
      combine_script: 'return state',
      reduce_script: `
        List messages = new ArrayList();
        Map conversationsMap = new HashMap();
        Map intentsMap = new HashMap();
        boolean[] ifElseWorkaround = new boolean[1];
    
        for (state in states) {
          messages.addAll(state.messagesList);
        }
    
        messages.stream().forEach(message -> {
          Map existingMessage = conversationsMap.get(message.conversationId);
          if(existingMessage == null || message.time > existingMessage.time) {
            conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
          } else {
            ifElseWorkaround[0] = true;
          }
        });
    
        conversationsMap.entrySet().forEach(conversation -> {
          if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
            long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
            intentsMap.put(conversation.getValue().intentsPath, intentsCount);
          } else {
            intentsMap.put(conversation.getValue().intentsPath, 1L);
          }
        });
    
        return intentsMap.entrySet().stream().map(intentPath -> [
          'path': intentPath.getKey().toString(),
          'count': intentPath.getValue()
        ]).collect(Collectors.toSet())`
    

    The answer:

    {
        "took": 2,
        "timed_out": false,
        "_shards": {
            "total": 5,
            "successful": 5,
            "skipped": 0,
            "failed": 0
        },
        "hits": {
            "total": {
                "value": 11,
                "relation": "eq"
            },
            "max_score": null,
            "hits": []
        },
        "aggregations": {
            "intentPathsCountAgg": {
                "value": [
                    {
                        "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
                    },
                    {
                        "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3  -> smallTalk.greet4": 1
                    },
                    {
                        "smallTalk.greet -> smallTalk.greet2": 1
                    }
                ]
            }
        }
    }