Добавляете новый HTTP-клиент в Elasticsearch, чтобы клиентские приложения могли работать с AWS Elasticsearch?

Я пытаюсь добавить HTTP-доступ Elasticsearch к клиент Titan ES с использованием JEST. titan-es поддерживает только локальный и транспортный (TCP) режимы ES. Но я хотел бы поддерживать связь через HTTP-интерфейс ES. Это позволит клиентским библиотекам, таким как titan-es, использовать AWS Elasticsearch в качестве серверной части индексирования, которая предоставляет только HTTP (С) интерфейс. Дополнительную информацию см. в этом сообщении.

Я ищу некоторые отзывы о подходе, который я рассматриваю до сих пор:

  1. Создайте новый класс ElasticsearchHttpClient, реализующий интерфейс org.elasticache.client.Client. Новый класс будет использовать JestClient в качестве внутреннего клиента. Таким образом, он будет связываться с ES через HTTP. Новый класс, скорее всего, расширит ES' AbstractClient, чтобы сократить методы, которые должны быть реализованы, до: admin(), settings(), execute(), threadPool() и close().
  2. Добавьте новое перечисление HTTP_CLIENT в ElasticSearchSetup
  3. Убедитесь, что метод connect() для HTTP_CLIENT возвращает экземпляр Connection, который содержит правильные значения для node и client. Член client будет экземпляром нового класса ElasticsearchHttpClient.
  4. Убедитесь, что метод ElasticSearchIndex.interfaceConfiguration() извлекает правильный экземпляр Connection (содержащий новый ElasticsearchHttpClient), если INTERFACE настроен как HTTP_CLIENT. С этого момента остальная часть кода должна продолжать работать над новым протоколом.

Это похоже на то, что это должно работать? Первый шаг меня больше всего беспокоит — я не уверен, что смогу реализовать все клиентские методы с помощью JestClient.

package com.thinkaurelius.titan.diskstorage.es;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;

public class ElasticsearchHttpClient extends AbstractClient {
    private final JestClient internalClient;
    private final ThreadPool pool;

    public ElasticsearchHttpClient(String hostname, int port) {
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig
                .Builder(String.format("http://%s:%d", hostname, port))
                .multiThreaded(true)
                .build());
        JestClient client = factory.getObject();

        this.pool = new ThreadPool("jest");
        this.internalClient = client;
    }

    @Override
    public AdminClient admin() {
        return null;
    }

    @Override
    public Settings settings() {
        return null;
    }

    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
        try {
            JestResult response = internalClient.execute(convertRequest(action, request));
            return convertResponse(response);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
        execute(action, request);
    }

    private <Response extends ActionResponse> ActionFuture<Response> convertResponse(JestResult result) {
        // TODO How to convert a JestResult a Elasticsearch ActionResponse/ActionFuture?
        return null;
    }

    private <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> io.searchbox.action.Action<JestResult> convertRequest(Action<Request, Response, RequestBuilder, Client> action, Request request) {
        // TODO How to convert an Elasticsearch Action<..> and Request to a Jest Action<JestResult>?
        return null;
    }

    @Override
    public ThreadPool threadPool() {
        return pool;
    }

    @Override
    public void close() throws ElasticsearchException {
        pool.shutdownNow();
    }
}

[Я также задал этот вопрос в списке рассылки Titan и форум Elasticsearch.]


person Ingo    schedule 15.01.2016    source источник


Ответы (1)


Я разместил ответ в списке рассылки Titan.

Что вам нужно сделать с точки зрения Titan, так это реализовать интерфейс IndexProvider. Я предполагаю, что невозможно сделать так, чтобы Jest выглядел как полноценный клиент Elasticsearch.

Я думаю, вы бы использовали JestHttpClient — вам не нужно реализовывать интерфейс Jest. В IndexProvider есть методы для создания/удаления/изменения/запроса индекса, которые вы должны иметь возможность делать через HTTP. Ознакомьтесь с документацией Elasticsearch HTTP, чтобы узнать, можете ли вы выполнять все необходимые методы в IndexProvider с помощью JestHttpClient.

Уже существует реализация ElasticSearchIndex IndexProvider, которая выполняет NODE и TRANSPORT. Вы пытаетесь добавить параметр HTTP или JEST. Таким образом, вы можете подумать о том, чтобы втиснуть свои изменения в ElasticSearchIndex, но я не уверен, насколько хорошо это сработает, поскольку два существующих импликатора являются полноценными клиентами Elasticsearch. Возможно, рассмотрите возможность создания отдельного ElasticSearchHttpIndex, реализующего IndexProvider, если он чище.

person Jason Plurad    schedule 15.01.2016