Политика ThreadPoolExecutor

Я пытаюсь использовать ThreadPoolExecutor для планирования задач, но у меня возникают проблемы с его политиками. Вот его заявленное поведение:

  1. Если выполняется меньше, чем corePoolSize потоков, Executor всегда предпочитает добавлять новый поток, а не ставить в очередь.
  2. Если corePoolSize или несколько потоков запущены, Executor всегда предпочитает ставить запрос в очередь, а не добавлять новый поток.
  3. Если запрос не может быть поставлен в очередь, создается новый поток, если он не превысит maximumPoolSize, и в этом случае задача будет отклонена.

Я хочу следующее поведение:

  1. то же, что и выше
  2. Если количество запущенных потоков превышает corePoolSize, но меньше maximumPoolSize, предпочтительнее добавлять новый поток, а не ставить в очередь, и использовать неактивный поток, а не добавлять новый поток.
  3. то же, что и выше

В основном я не хочу, чтобы какие-либо задачи отклонялись; Я хочу, чтобы они стояли в неограниченной очереди. Но я действительно хочу иметь потоки до максимального размера пула. Если я использую неограниченную очередь, она никогда не генерирует потоки после достижения coreSize. Если я использую ограниченную очередь, она отклоняет задачи. Есть ли способ обойти это?

Сейчас я думаю о том, чтобы запустить ThreadPoolExecutor в SynchronousQueue, но не передавать задачи непосредственно ему, а вместо этого передавать их в отдельную неограниченную LinkedBlockingQueue. Затем другой поток переходит из LinkedBlockingQueue в Executor, и если один из них отклоняется, он просто пытается снова, пока он не будет отклонен. Это похоже на боль и немного взлома - есть ли более чистый способ сделать это?


person Joe K    schedule 05.08.2010    source источник


Ответы (4)


Ваш вариант использования является обычным, полностью законным и, к сожалению, более сложным, чем можно было бы ожидать. Для получения дополнительной информации вы можете прочитать это обсуждение и найти указатель на решение (также упомянутый в теме) здесь. Решение Шэя работает нормально.

Обычно я бы немного опасался неограниченных очередей; Обычно лучше иметь явный контроль входящего потока, который постепенно ухудшается и регулирует соотношение текущей / оставшейся работы, чтобы не перегружать ни производителя, ни потребителя.

person Holger Hoffstätte    schedule 06.08.2010

Вероятно, нет необходимости в микроуправлении пулом потоков по запросу.

Кэшированный пул потоков будет повторно использовать незанятые потоки, а также разрешить потенциально неограниченное количество одновременных потоков. Это, конечно, может привести к снижению быстродействия из-за накладных расходов на переключение контекста во время периодов скачка.

Executors.newCachedThreadPool();

Лучше всего установить ограничение на общее количество потоков, отказавшись от идеи, что сначала будут использоваться незанятые потоки. Изменения конфигурации будут следующими:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

Рассуждая по этому сценарию, если у исполнителя меньше, чем corePoolSize потоков, он не должен быть очень занят. Если система не очень загружена, запуск нового потока не повредит. Это приведет к тому, что ваш ThreadPoolExecutor всегда будет создавать нового воркера, если его количество меньше максимально разрешенного. Только когда максимальное количество рабочих «работает», работникам, праздно ожидающим выполнения задач, будут даны задания. Если рабочий ожидает aReasonableTimeDuration без задачи, ему разрешается завершить работу. Используя разумные ограничения для размера пула (в конце концов, существует только определенное количество процессоров) и достаточно большой тайм-аут (чтобы потоки не завершались без необходимости), желаемые преимущества, вероятно, будут видны.

Последний вариант - хакерский. По сути, ThreadPoolExecutor внутренне использует BlockingQueue.offer, чтобы определить, есть ли у очереди емкость. Специальная реализация BlockingQueue всегда может отклонить попытку offer. Когда ThreadPoolExecutor не удается offer задачу в очередь, он пытается создать нового рабочего. Если новый рабочий не может быть создан, будет вызван RejectedExecutionHandler. В этот момент пользовательский RejectedExecutionHandler может принудительно вставить put в пользовательский BlockingQueue.

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}
person Tim Bender    schedule 06.08.2010

Просто установите corePoolsize = maximumPoolSize и используйте неограниченную очередь?

В вашем списке очков 1 исключает 2, поскольку corePoolSize всегда будет меньше или равно maximumPoolSize.

Изменить

Между тем, что вы хотите, и тем, что вам предложит TPE, все еще есть что-то несовместимое.

Если у вас неограниченная очередь, maximumPoolSize игнорируется, поэтому, как вы заметили, никогда не будет создано и использовано более corePoolSize потоков.

Итак, опять же, если вы возьмете corePoolsize = maximumPoolSize с неограниченной очередью, вы получите то, что хотите, нет?

person eljenso    schedule 05.08.2010
comment
Ой, я написал не совсем то, что я хотел. Я редактировал оригинал. - person Joe K; 06.08.2010
comment
Установка corePoolsize = maximumPoolSize действительно близка, но я также использую allowCoreThreadTimeOut (false) и prestartAllCoreThreads (). - person Joe K; 06.08.2010

Хотели бы вы чего-то более похожего на кешированный пул потоков?

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool()

person vimalloc    schedule 06.08.2010