Просматривайте все документы и массово обновляйте некоторые из них

Я использую клиент Jest для Elastic, чтобы просмотреть индекс документа и обновить одно поле. Мой рабочий процесс состоит в том, чтобы запустить пустой запрос с пейджингом и посмотреть, смогу ли я вычислить дополнительное поле. Если я могу, я обновляю соответствующие документы в одном массовом обновлении.

Псевдокод

private void process() {
    int from = 0
    int size = this.properties.batchSize
    boolean moreResults = true
    while (moreResults) {
        moreResults = handleBatch(from, this.properties.batchSize)
        from += size
    }
}

private boolean handleBatch(int from, int size) {
    log.info("Processing records $from to " + (from + size))
    def result = search(from, size)
    if (result.isSucceeded()) {
        // Check each element and perform an upgrade
    }
    // return true if the query returned at least one item
}

private SearchResult search(int from, int size) {
    String query =
            '{ "from": ' + from + ', ' +
                    '"size": ' + size + '}'


    Search search = new Search.Builder(query)
            .addIndex("my-index")
            .addType('my-document')
            .build();
    jestClient.execute(search)
}

У меня нет никакой ошибки, но когда я запускаю пакет несколько раз, похоже, что он находит «новые» документы для обновления, в то время как общее количество документов не изменилось. У меня возникло подозрение, что обновленный документ обрабатывался несколько раз, что я мог подтвердить, проверив обработанные идентификаторы.

Как я могу запустить запрос, чтобы исходные документы обрабатывались и никакие обновления не мешали ему?


person Stephane Nicoll    schedule 13.02.2016    source источник


Ответы (1)


Вместо обычного поиска (т. е. с использованием from+size) вам нужно запустить scroll поисковый запрос. Основное отличие состоит в том, что прокрутка заморозит данный снимок документов (во время запроса) и запросит их. Какие бы изменения ни произошли после первого запроса на прокрутку, они учитываться не будут.

Используя Jest, вам нужно изменить свой код, чтобы он выглядел примерно так:

    // 1. Initiate the scroll request
    Search search = new Search.Builder(searchSourceBuilder.toString())
            .addIndex("my-index")
            .addType("my-document")
            .addSort(new Sort("_doc"))
            .setParameter(Parameters.SIZE, size)
            .setParameter(Parameters.SCROLL, "5m")
            .build();
    JestResult result = jestClient.execute(search);

    // 2. Get the scroll_id to use in subsequent request
    String scrollId = result.getJsonObject().get("_scroll_id").getAsString();

    // 3. Issue scroll search requests until you have retrieved all results
    boolean moreResults = true;
    while (moreResults) {
        SearchScroll scroll = new SearchScroll.Builder(scrollId, "5m")
                .setParameter(Parameters.SIZE, size).build();
        result = client.execute(scroll);
        def hits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
        moreResults = hits.size() > 0;
    }

Вам необходимо изменить методы process и handleBatch с помощью приведенного выше кода. Это должно быть просто, дайте мне знать, если нет.

person Val    schedule 15.02.2016
comment
Круто, это решило мою проблему. Благодарю вас! - person Stephane Nicoll; 15.02.2016
comment
Круто, рад, что помог! - person Val; 15.02.2016