ElasticSearch сопоставляет результат операций сворачивания/выполнения над сгруппированными документами

Существует список бесед, и у каждой беседы есть список сообщений. Каждое сообщение имеет разные поля и поле action. Нужно учитывать, что в первых сообщениях беседы используется действие A, через несколько сообщений используется действие A.1, через некоторое время A.1.1 и так далее (есть список намерений чатбота).

Группировка действий сообщений в беседе будет выглядеть примерно так: A > A > A > A.1 > A > A.1 > A.1.1 ...

Проблема:

Мне нужно создать отчет с помощью ElasticSearch, который будет возвращать actions group каждого разговора; затем мне нужно сгруппировать похожие actions groups, добавив количество; в итоге получится Map<actionsGroup, count> как 'A > A.1 > A > A.1 > A.1.1', 3.

При построении actions group мне нужно исключить все группы дубликатов; Вместо A > A > A > A.1 > A > A.1 > A.1.1 мне нужно иметь A > A.1 > A > A.1 > A.1.1.

Шаги, которые я начал делать:

{
   "collapse":{
      "field":"context.conversationId",
      "inner_hits":{
         "name":"logs",
         "size": 10000,
         "sort":[
            {
               "@timestamp":"asc"
            }
         ]
      }
   },
   "aggs":{
   },
}

Что мне нужно дальше:

  1. Мне нужно сопоставить результат коллапса с одним результатом, например A > A.1 > A > A.1 > A.1.1. Я видел, что в случае aggr можно использовать скрипты над результатом и есть возможность создать список действий, как мне нужно, но aggr выполняет операции над всеми сообщениями, а не только над сгруппированными сообщениями что у меня в развале. Можно ли использовать aggr внутри коллапса или подобное решение?
  2. Мне нужно сгруппировать полученные значения (A > A.1 > A > A.1 > A.1.1) из всех коллапсов, добавить счетчик и получить Map<actionsGroup, count>.

Или:

  1. Сгруппируйте сообщения разговоров по полю conversationId, используя aggr (я не знаю, как это сделать)
  2. Используйте скрипт для повторения всех значений и создания actions group для каждого диалога. (не уверен, что это возможно)
  3. Используйте еще один aggr для всех значений и сгруппируйте дубликаты, возвращая Map<actionsGroup, count>.

Обновление 2: мне удалось получить частичный результат, но осталась одна проблема. Пожалуйста, проверьте здесь, что мне еще нужно исправить.

Обновление 1: добавлены некоторые дополнительные сведения.

Сопоставления:

"mappings":{
  "properties":{
     "@timestamp":{
        "type":"date",
        "format": "epoch_millis"
     }
     "context":{
        "properties":{
           "action":{
              "type":"keyword"
           },
           "conversationId":{
              "type":"keyword"
           }
        }
     }
  }
}

Образцы документов разговоров:

Conversation 1.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id1",
    }
}

Conversation 2.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id2",
    }
}

Conversation 3.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "B",
        "conversationId": "conv_id3",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "B.1",
        "conversationId": "conv_id3",
    }
}

Ожидаемый результат:

{
    "A -> A.1 -> A.1.1": 2,
    "B -> B.1": 1
}
Something similar, having this or any other format.

Поскольку я новичок в elasticsearch, каждый совет приветствуется.


person Radu Linu    schedule 12.03.2020    source источник
comment
Вы можете использовать агрегацию терминов для группировки по ключу. Если вы можете добавить сопоставление, образец документа и ожидаемый результат, вам будет легче понять проблему.   -  person jaspreet chahal    schedule 12.03.2020
comment
@jaspreetchahal Я добавил некоторые дополнительные детали.   -  person Radu Linu    schedule 12.03.2020
comment
Вам нужно количество разговоров ex convs1-›2, convs2-›1 ? Какова роль инициативной группы здесь?   -  person jaspreet chahal    schedule 12.03.2020
comment
Нет. Мне нужно количество разговоров actions group. Как и у каждого разговора есть список действий A -> A.1 -> A.1.1, это actions group; Мне нужно знать количество actions group.   -  person Radu Linu    schedule 12.03.2020
comment
Я бы посоветовал вам отказаться от агрегации и написать полный сценарий самостоятельно. elastic.co/guide/en/elasticsearch/ ссылка/мастер/   -  person Alexander Mikhalchenko    schedule 15.03.2020
comment
Большое спасибо! Вот так я решил свою проблему.   -  person Radu Linu    schedule 17.03.2020


Ответы (2)


Я решил это, используя scripted_metric эластичности. Кроме того, index было изменено по сравнению с исходным состоянием.

Сценарий:

{
   "size": 0,
   "aggs": {
        "intentPathsCountAgg": {
            "scripted_metric": {
                "init_script": "state.messagesList = new ArrayList();",
                "map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",  
                "combine_script": "return state",
                "reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
            }
        }
    }
}

Отформатированный скрипт (для лучшей читабельности - с использованием .ts):

scripted_metric: {
  init_script: 'state.messagesList = new ArrayList();',
  map_script: `
    long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
    Map currentMessage = [
      'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
      'time': currentMessageTime,
      'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
    ];
    state.messagesList.add(currentMessage);`,
  combine_script: 'return state',
  reduce_script: `
    List messages = new ArrayList();
    Map conversationsMap = new HashMap();
    Map intentsMap = new HashMap();
    boolean[] ifElseWorkaround = new boolean[1];

    for (state in states) {
      messages.addAll(state.messagesList);
    }

    messages.stream().forEach(message -> {
      Map existingMessage = conversationsMap.get(message.conversationId);
      if(existingMessage == null || message.time > existingMessage.time) {
        conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
      } else {
        ifElseWorkaround[0] = true;
      }
    });

    conversationsMap.entrySet().forEach(conversation -> {
      if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
        long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
        intentsMap.put(conversation.getValue().intentsPath, intentsCount);
      } else {
        intentsMap.put(conversation.getValue().intentsPath, 1L);
      }
    });

    return intentsMap.entrySet().stream().map(intentPath -> [
      'path': intentPath.getKey().toString(),
      'count': intentPath.getValue()
    ]).collect(Collectors.toSet())`

Ответ:

{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 11,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "intentPathsCountAgg": {
            "value": [
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
                },
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3  -> smallTalk.greet4": 1
                },
                {
                    "smallTalk.greet -> smallTalk.greet2": 1
                }
            ]
        }
    }
}
person Radu Linu    schedule 17.03.2020

Использование скрипта в Агрегация терминов мы можем создавать корзины по первому символу "context.action". Используя аналогичную субагрегацию терминов, мы можем получить все «context.action» в родительском сегменте ex A-> A.1-> A.1.1...

Запрос:

{
  "size": 0,
  "aggs": {
    "conversations": {
      "terms": {
        "script": {
          "source": "def term=doc['context.action'].value; return term.substring(0,1);" 
--->  returns first character ex A,B,C etc
        },
        "size": 10
      },
      "aggs": {
        "sub_conversations": {
          "terms": {
            "script": {
              "source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> All context.action under [A], length check to ignore [A]
            },
            "size": 10
          }
        },
        "count": {
          "cardinality": {
            "script": {
              "source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> count of all context.action under A
            }
          }
        }
      }
    }
  }
}

Так как в эластичном поиске невозможно объединить разные документы. вам нужно будет получить комбинированный ключ на стороне клиента, перебирая ведро агрегации.

Результат:

  "aggregations" : {
    "conversations" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "A",
          "doc_count" : 6,
          "sub_conversations" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : "A.1",
                "doc_count" : 2
              },
              {
                "key" : "A.1.1",
                "doc_count" : 2
              }
            ]
          },
          "count" : {
            "value" : 2
          }
        },
        {
          "key" : "B",
          "doc_count" : 2,
          "sub_conversations" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : "B.1",
                "doc_count" : 1
              }
            ]
          },
          "count" : {
            "value" : 1
          }
        }
      ]
    }
  }
person jaspreet chahal    schedule 12.03.2020
comment
Это не совсем то, что мне нужно. Из-за производительности мне нужно получить почти полный результат от elasticsearch. - person Radu Linu; 12.03.2020
comment
Мне удалось получить частичный результат для моей проблемы, но все же есть одна проблема. Пожалуйста, проверьте этот пост, если у вас есть какие-либо идеи. stackoverflow.com/questions/60662222 - person Radu Linu; 13.03.2020