ElasticSearch Ingest Pipeline: создание и обновление поля метки времени

Чтобы создать поле метки времени в моих индексах, в соответствии с этим ответом, я создал Ingest Pipeline для запуска определенных индексов:

PUT _ingest/pipeline/auto_now_add
{
  "description": "Assigns the current date if not yet present and if the index name is whitelisted",
  "processors": [
    {
      "script": {
        "source": """
          // skip if not whitelisted
          if (![ "my_index_1",
                 "my_index_2"
              ].contains(ctx['_index'])) { return; }
          
          // always update updated_at
          ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
          
        """
      }
    }
  ]
}

затем я применяю ко всем настройкам индексов как конвейер по умолчанию

PUT _all/_settings
{
  "index": {
    "default_pipeline": "auto_now_add"
  }
}

После этого я начинаю индексировать свои объекты в эти индексы. Когда я запрашиваю проиндексированный элемент, я получу этот элемент с полем updated_at, обновленным во время индексации, например:

{
  _index: 'my_index_1',
  _type: '_doc',
  _id: 'r1285044056',
  _version: 11,
  _seq_no: 373,
  _primary_term: 2,
  found: true,
  _source: {
    updated_at: '2021-07-07 04:35:39',
    ...
  }
}

Теперь я хотел бы иметь поле created_at, которое обновляется только в первый раз, поэтому я попытался обновить скрипт выше следующим образом:

PUT _ingest/pipeline/auto_now_add
{
  "description": "Assigns the current date if not yet present and if the index name is whitelisted",
  "processors": [
    {
      "script": {
        "source": """
          // skip if not whitelisted
          if (![ "my_index_1",
                 "my_index_2",
                 "..."
              ].contains(ctx['_index'])) { return; }
          
           // always update updated_at
          ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
          // don't overwrite if present
          if (ctx != null && ctx['created_at'] != null) { return; }
          
          ctx['created_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        """
      }
    }
  ]
}

но это решение, похоже, не работает: условие

if (ctx != null && ctx['created_at'] != null) { return; }

всегда будет давать сбой, что приведет к обновлению created_at при каждом обновлении объекта в индексе, точно так же, как и поле updated_at, что сделает его бесполезным. Итак, как предотвратить это и убедиться, что это поле created_at существует после того, как оно было создано конвейером загрузки?


person loretoparisi    schedule 07.07.2021    source источник
comment
Возможно, вам следует попробовать использовать doc['created_at'], чтобы проверить, является ли он нулевым или нет, а не ctx! Можешь попробовать?   -  person Aravind    schedule 07.07.2021
comment
спасибо, как получить доступ к doc в скрипте? Я не могу использовать напрямую if (doc['created_at'] != null) { return; } также синтаксис ctx._source, который не поддерживается в процессорах, так как должен использоваться синтаксис ctx[my_field].   -  person loretoparisi    schedule 07.07.2021


Ответы (1)


Как описано @Val в этом ответе:

... процессор(ы) конвейера загрузки будут работать только в контексте отправляемого документа, а не сохраненного (если он есть).

Таким образом, у вас не будет доступа к базовым _source и doc, поскольку конвейеры загрузки были разработаны для фазы ingest, а не для фазы обновления.


Конечно, вы можете оставить свой конвейер auto_now_add для автоматического добавления updated_at, и вы можете расширить его с помощью created_at (если он еще не присутствует в полезной нагрузке загрузки), проверив ctx.containsKey — поскольку ctx по сути является java Map:

PUT _ingest/pipeline/auto_now_add
{
  "description": "Assigns the current date if not yet present and if the index name is whitelisted",
  "processors": [
    {
      "script": {
        "source": """
          // skip if not whitelisted
          if (![ "my_index_1",
                 "my_index_2",
                 "..."
              ].contains(ctx['_index'])) { return; }
          
          def now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
          
          // guaranteee updated_at
          ctx['updated_at'] = now;
          
          // add created_at only if nonexistent in the payload
          if (!ctx.containsKey('created_at')) {
            ctx['created_at'] = now;
          }  
        """
      }
    }
  ]
}

Однако это сработает только в первый раз, когда вы загружаете документ!

Бег:

POST my_index_1/_doc/some_id
{ 
  "some": "param"
}

будет давать:

{
  "some" : "param",
  "updated_at" : "2021-07-08 10:35:13",
  "created_at" : "2021-07-08 10:35:13"
}

Теперь, чтобы автоматически увеличивать updated_at при каждом обновлении документа, вам понадобится еще один скрипт — на этот раз он будет храниться в _scripts, а не в _ingest/pipeline< /эм>:

PUT _scripts/incement_update_at__plus_new_params
{
  "script": {
    "lang": "painless", 
    "source": """
      // add whatever is in the params
      ctx._source.putAll(params);
      
      // increment updated_at no matter what was in the params
      ctx._source['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    """
  }
}

Затем, выполняя вызов _update, сделайте это, упомянув вышеупомянутый script:

POST my_index_1/_doc/some_id/_update
{
  "script": {
    "id": "incement_update_at__plus_new_params",
    "params": {
      "your": "new params"
    }
  }
}

который увеличит updated_at, не касаясь created_at, и добавит любые другие параметры:

{
   "some":"param",
   "updated_at":"2021-07-08 10:49:44",    <--
   "created_at":"2021-07-08 10:39:55",
   "your":"new params"                    <--
}

Бесстыдный плагин: конвейеры и сценарии подробно рассматриваются в моем Руководстве по Elasticsearch.

person Joe Sorocin    schedule 08.07.2021