Мы используем Java-клиент elasticsearch REST (мы работаем на Java 7, поэтому не можем использовать обычный Java-клиент elasticsearch) для взаимодействия с нашими серверами elasticsearch. Все это работает нормально, за исключением случаев, когда мы пытаемся выполнить начальную индексацию около 1,3 млн документов. Это работает какое-то время, но после нескольких сотен тысяч документов мы получаем
20/06 21:27:33,153 ERROR [cid=51][stderr][write:71] (pool-837116-thread-1) Exception in thread "pool-837116-thread-1" java.lang.OutOfMemoryError: unable to create new native thread
20/06 21:27:33,154 ERROR [cid=51][stderr][write:71] (pool-837116-thread-1) at java.lang.Thread.start0(Native Method)
20/06 21:27:33,154 ERROR [cid=51][stderr][write:71] (pool-837116-thread-1) at java.lang.Thread.start(Thread.java:693)
20/06 21:27:33,154 ERROR [cid=51][stderr][write:71] (pool-837116-thread-1) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:334)
20/06 21:27:33,154 ERROR [cid=51][stderr][write:71] (pool-837116-thread-1) at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:194)
20/06 21:27:33,154 ERROR [cid=51][stderr][write:71] (pool-837116-thread-1) at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
20/06 21:27:33,155 ERROR [cid=51][stderr][write:71] (pool-837116-thread-1) at java.lang.Thread.run(Thread.java:724)
с последующим
java.lang.IllegalStateException: Request cannot be executed; I/O reactor status: STOPPED
at org.apache.http.util.Asserts.check(Asserts.java:46)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.ensureRunning(CloseableHttpAsyncClientBase.java:90)
at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:123)
at org.elasticsearch.client.RestClient.performRequestAsync(RestClient.java:343)
at org.elasticsearch.client.RestClient.performRequestAsync(RestClient.java:325)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:218)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:191)
Как видите, клиент Elasticsearch REST использует apache http nio. Что мне показалось странным, так это то, что библиотека nio создает поток для каждого отдельного запроса (или соединения?). Из приведенного выше журнала вы можете увидеть поток (pool-837116-thread-1). Существует также множество потоков диспетчера ввода-вывода с растущим числом.
Однако общее количество живых потоков, похоже, не сильно изменилось. Таким образом, кажется, что вместо повторного использования потоков для каждого цикла подключения создается (или два) новый поток. Загрузка в основном:
<сильный>1. Создать клиента
restClient = RestClient.builder(new HttpHost(host.getHost(),host.getPort(),host.getProtocol())/*,new HttpHost(host.getHost(),host.getPort()+1,host.getProtocol())*/)
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credsProvider)
}
}).setMaxRetryTimeoutMillis(30000).build();
<сильный>2. Отправить запрос с телом json и закрыть клиент
try{
HttpEntity entity = new NStringEntity(json,ContentType.APPLICATION_JSON);
Response indexResponse = restClient.performRequest("PUT", endpoint, parameters,entity,header);
log.debug("Response #0 #1", indexResponse,indexResponse.getStatusLine());
log.debug("Entity #0",indexResponse.getEntity());
}finally{
if(restClient!=null){
log.debug("Closing restClient #0", restClient);
restClient.close();
}
}
Это нормально? Почему apache nio не использует потоки повторно? Это проблема с REST-клиентом elasticsearch, apache nio или моим кодом? Я вызываю close на restClient, не уверенный, что еще я должен сделать.
Я попытался установить количество потоков на 1 на IO Reactor:
restClient = RestClient.builder(new HttpHost(host.getHost(),host.getPort(),host.getProtocol())/*,new HttpHost(host.getHost(),host.getPort()+1,host.getProtocol())*/)
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credsProvider)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build()); //set to one thread
}
}).setMaxRetryTimeoutMillis(30000).build();
но это ничего не изменило в отношении повторного использования потоков.