Ошибка OutOfMemory с FixedThreadPool и ExecutorCompletionService

Я работаю над приложением, которое должно получать список пользователей из базы данных и обновлять данные из каталога (ldap или AD). Я выполняю эту процедуру на многоядерной машине, поэтому я создал это приложение (код ниже). Я использую CompletionService и получаю результаты в объекте Future.

Через некоторое время я получаю сообщение об ошибке нехватки памяти с сообщением «невозможно создать новый собственный поток». В диспетчере задач я вижу, что приложение создало огромное количество потоков, но я попросил создать фиксированный пул потоков с размером, равным количеству моих процессоров.

Что не так с моим кодом?

class CheckGroupMembership {
public static void main(String[] args) throws Exception {

    final ExecutorService executor = Executors.newFixedThreadPool(**Runtime.getRuntime().availableProcessors()**);

    CompletionService<LdapPerson> completionService =
        new ExecutorCompletionService(executor)<LdapPerson>(executor);

    final int limit = 2000;

    DocumentService service1 = new DocumentService();
    List<String> userNamesList = service1.getUsersListFromDB(limit);

    List<LdapPerson> ldapPersonList = new ArrayList() <LdapPerson> (userNamesList.size());
    LdapPerson person;

    for (String userName : userNamesList) {
        completionService.submit(new GetUsersDLTask(userName));
    }

    try {
        for (int i = 0, n = userNamesList.size(); i < n; i++) {
            Future<LdapPerson> f = completionService.take();
            person = f.get();
            ldapPersonList.add(person);
        }
    } catch (InterruptedException e) {

        System.out.println("InterruptedException error:" + e.getMessage());
    } catch (Exception e) {
        System.out.println(e.getMessage());
    }
    System.exit(0);
}
}

ОШИБКА CheckGroupMembership: 85 - java.lang.OutOfMemoryError: невозможно создать новый собственный поток java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: невозможно создать новый собственный поток в java.util.concurrent.FutureTask$Sync.innerGet( FutureTask.java:222) по адресу java.util.concurrent.FutureTask.get(FutureTask.java:83).

Задача GetuserDLs

public class GetUsersDLTask implements Callable<LdapPerson> {
private String userName;

public GetUsersDLTask(String u) {
    this.userName = u;
}

@Override
public LdapPerson call() throws Exception {
    LdapService service = new LdapService();
    return service.getUsersDLs(userName);
}

}

person Vik Gamov    schedule 27.10.2011    source источник
comment
Запросите дамп потока с вашей JVM. Как называются темы? Создает ли LdapService поток?   -  person Emil Sit    schedule 27.10.2011
comment
LdapService не создает никаких потоков. Я использую стандартный материал javax.naming.*   -  person Vik Gamov    schedule 27.10.2011
comment
Что находится в дампе потока от JVM до OOM? Как называются темы?   -  person Emil Sit    schedule 27.10.2011
comment
Названия потоков — Thread-2 и Thread-3.   -  person Vik Gamov    schedule 27.10.2011


Ответы (3)


Мне трудно поверить, что вы не создаете поток в GetUsersDLTask (или, по крайней мере, это сервисный объект). Если вы посмотрите на свою трассировку стека, исключение будет вызвано методом Future get(). Единственный способ установить это исключение - после того, как Executor вызовет Callabale.call(). Любой бросок, который встречается в методе call(), будет установлен во внутреннем поле exception Future.

Например:

Thread Pool: 
    Thread-1
      invoke call()
        call() 
          Create Thread
            throw OutOfMemoryError 
         propogate error to Thread pool
      set exception

В противном случае это исключение возникало бы при отправке запроса в пул потоков, а не при получении из будущего.

person John Vint    schedule 27.10.2011
comment
Да, я верю, что вы правы. В GetUsersDLTask я создал новый экземпляр LdapService, который каждый раз создает объект InitialContext (и создает новый поток для связи сокета с ldap). Так что ваше предположение о мнемокоде верно. Спасибо за совет! - person Vik Gamov; 27.10.2011

Executors.newFixedThreadPool примет отправку многих задач, но будет выполнять только то количество потоков, которое вы разрешите. Таким образом, если у вас есть фиксированный пул из 2 потоков, но вы отправляете 50 задач, 48 других задач ставятся в очередь внутри исполнителя и запускаются по мере того, как выполняющиеся потоки завершают задачи. Похоже, вам нужно ограничить количество потоков, которые вы создаете в своем коде.

Изменить: проверьте http://download.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int)

person Noah    schedule 27.10.2011
comment
Но создал ли executor оставшиеся 48 потоков в то время? Как я могу контролировать создание потоков? - person Vik Gamov; 27.10.2011
comment
Как показывает предлагаемое изменение, вы отправляете задачи, а пул потоков управляет потоками. - person Emil Sit; 27.10.2011
comment
а, почему голосование против? У вас заканчивается память, потому что вы создаете слишком много потоков - исполнитель возьмет столько, сколько вы ему дадите, но выполнит только число, указанное в фиксированном пуле. Вы можете контролировать создание потоков, потому что именно вы отправляете потоки исполнителю; вам действительно нужен новый поток для каждого имени пользователя? Возможно, разбейте их на группы по 100 или около того и отправьте это в поток. - person Noah; 27.10.2011
comment
Вопрос и комментарий были двумя отдельными мыслями. Не беспокойся :) - person Noah; 27.10.2011
comment
Согласно этому примеру javadoc download.oracle.com /javase/6/docs/api/java/util/concurrent/ Я не делаю ничего запрещенного с ECS. Я не создаю темы явно. - person Vik Gamov; 27.10.2011
comment
Да - создание тем/задач выполняется корректно. Вы просто создаете их тонну — по одному на имя пользователя. Вам нужно так много? - person Noah; 27.10.2011
comment
GetUsersDLTask реализует Callable. Callable похож на Runnable; новый Callable при «отправке» исполнителю создаст новый поток (и вернет FtureTask) и, возможно, будет добавлен в очередь. CompletionService принимает экземпляр исполнителя; это просто оболочка, которая использует потоки за кулисами. В этом примере каждый отправленный Callable будет преобразован в поток CompletionService с помощью Executor. - person Noah; 27.10.2011
comment
@sqrv Это неправда. Когда вы отправляете Callable/Runnable, он будет помещен в рабочую очередь ES. Если пул потоков фиксирован, то каждый Runnable/Callable будет использоваться только тогда, когда поток доступен и может опрашивать очередь. Если у вас есть Executors.newCachedThreadPool, то да, каждый отправленный Callable будет порождать новый поток, если поток не простаивает. это не тот случай - person John Vint; 27.10.2011
comment
Ах! Я смешиваю термины «Задача» и «Поток» как одно и то же, хотя на самом деле существует функциональное различие. Спасибо за разъяснение Джон. - person Noah; 27.10.2011

Вы проверили количество потоков, созданных в фиксированном пуле. Возможно, количество доступных процессоров слишком велико.

person Dave    schedule 27.10.2011