Java Concurrency на практике: состояние гонки в BoundedExecutor?

Есть что-то странное в реализации BoundedExecutor в книге Java Concurrency in Practice.

Предполагается, что он будет регулировать отправку задач исполнителю, блокируя отправляющий поток, когда в исполнителе достаточно потоков, либо поставленных в очередь, либо запущенных в исполнителе.

Это реализация (после добавления отсутствующего повторного броска в предложении catch):

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();

        try {
            exec.execute(new Runnable() {
                @Override public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }

Когда я создаю экземпляр BoundedExecutor с Executors.newCachedThreadPool() и границей 4, я ожидаю, что количество потоков, созданных кэшированным пулом потоков, никогда не превысит 4. Однако на практике это так. Я получил эту небольшую тестовую программу для создания целых 11 потоков:

public static void main(String[] args) throws Exception {
    class CountingThreadFactory implements ThreadFactory {
        int count;

        @Override public Thread newThread(Runnable r) {
            ++count;
            return new Thread(r);
        }           
    }

    List<Integer> counts = new ArrayList<Integer>();

    for (int n = 0; n < 100; ++n) {
        CountingThreadFactory countingThreadFactory = new CountingThreadFactory();
        ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory);

        try {
            BoundedExecutor be = new BoundedExecutor(exec, 4);

            for (int i = 0; i < 20000; ++i) {
                be.submitTask(new Runnable() {
                    @Override public void run() {}
                });
            }
        } finally {
            exec.shutdown();
        }

        counts.add(countingThreadFactory.count);
    }

    System.out.println(Collections.max(counts));
}

Я думаю, что между выпуском семафора и завершением задачи есть крошечный промежуток времени, когда другой поток может получить разрешение и отправить задачу, в то время как освобождающий поток еще не завершился. Другими словами, у него есть состояние гонки.

Кто-нибудь может это подтвердить?


person Jan Van den bosch    schedule 10.04.2012    source источник
comment
Я добавил 1 мс Thread.sleep сразу после semaphore.release(), чтобы увидеть, насколько хуже это станет: я создал более 300 потоков.   -  person toto2    schedule 10.04.2012


Ответы (3)


Я вижу целых 9 потоков, созданных одновременно. Я подозреваю, что существует состояние гонки, из-за которого возникает больше потоков, чем требуется.

Это может быть связано с тем, что до и после выполнения задачи необходимо выполнить работу. Это означает, что даже несмотря на то, что внутри вашего блока кода всего 4 потока, есть ряд потоков, останавливающих предыдущую задачу или готовящихся к запуску новой задачи.

то есть поток выполняет release(), пока он все еще работает. Даже если это последнее, что вы делаете, это не последнее, что он делает перед получением новой задачи.

person Peter Lawrey    schedule 10.04.2012
comment
Я согласен с вашим утверждением, которое, по сути, и подозревал Босси в первую очередь. Но я не уверен, что назвал бы это состоянием гонки, поскольку это подразумевает некоторую программную ошибку: я не думаю, что программа должна иметь максимальное количество потоков, как она написана. - person toto2; 10.04.2012
comment
@toto2 toto2 Это не обычная ошибка программирования, но существует состояние гонки между выпуском семафора и получением потоком новой задачи. Если бы задача существовала дольше, вы могли бы редко видеть такое поведение. - person Peter Lawrey; 10.04.2012
comment
Я понимаю про длину задачи (см. мой комментарий к вопросу Босси). Я просто имею в виду, что это не ошибка, потому что ничего не указывает на то, что должно быть 4 потока. Это не баг, это фича! В этом случае я думаю, что кто-то действительно мог бы сделать это заявление. Извините... Я просто начинаю философствовать. - person toto2; 10.04.2012
comment
Я не думаю, что это баг, но если кто-то захочет это исправить, я не вижу никакого способа (кроме, конечно, иметь 4-поточный исполнитель). - person toto2; 10.04.2012
comment
@ toto2: я хочу сказать, что BoundedExecutor должен помочь предотвратить избыток используемой памяти / созданных потоков, но в крайне редких случаях это просто не так. Это никогда не приводит к недопустимому состоянию программы, как это происходит в типичном состоянии гонки, но и не является на 100% надежным. Однако это не помешает мне использовать его в моих программах. - person Jan Van den bosch; 10.04.2012
comment
@Bossie Лично я бы просто использовал пул потоков фиксированного размера. Мне кажется проще. - person Peter Lawrey; 11.04.2012

BoundedExecutor действительно был задуман как иллюстрация того, как регулировать отправку задач, а не как способ ограничить размер пула потоков. Есть более прямые способы достижения последнего, как указано по крайней мере в одном комментарии.

Но в других ответах не упоминается текст в книге, в котором говорится об использовании неограниченной очереди и

установите границу семафора, равную размеру пула плюс количество задач в очереди, которые вы хотите разрешить, поскольку семафор ограничивает количество задач, которые выполняются в данный момент и ожидают выполнения. [JCiP, конец раздела 8.3.3]

Упоминая неограниченные очереди и размер пула, мы подразумевали (видимо, не очень ясно) использование пула потоков ограниченного размера.

Что меня всегда беспокоило в BoundedExecutor, так это то, что он не реализует интерфейс ExecutorService. Современным способом достижения аналогичной функциональности и реализации стандартных интерфейсов было бы использование listeningDecorator и ForwardingListeningExecutorService.

person Tim Peierls    schedule 11.04.2012
comment
Ответ от одного из экспертов, это потрясающе. Спасибо, что прояснил ситуацию, Тим. Эти ListenableFuture выглядят интересно. Итак, я должен отправить пару задач, и всякий раз, когда одна из них завершается, я отправляю новую в обратном вызове, верно? - person Jan Van den bosch; 11.04.2012
comment
@Bossie - Конечно, вы могли бы это сделать, но я имел в виду, что вы можете продолжать использовать семафорный подход BoundedExecutorService, украсив выполнение получением, а обратный вызов - выпуском. - person Tim Peierls; 04.06.2012

Вы правы в своем анализе состояния гонки. Нет никаких гарантий синхронизации между ExecutorService и семафором.

Однако я не знаю, используется ли ограничение количества потоков для BoundedExecutor. Я думаю, что это больше для ограничения количества задач, отправляемых в службу. Представьте, что у вас есть 5 миллионов задач, которые нужно отправить, и если вы отправите более 10 000 из них, вам не хватит памяти.

Ну, у вас будет только 4 потока, работающих в любой момент времени, зачем вам пытаться ставить в очередь все 5 миллионов задач? Вы можете использовать подобную конструкцию, чтобы регулировать количество задач, стоящих в очереди в любой момент времени. Что вы должны получить из этого, так это то, что в любой момент времени выполняется только 4 задачи.

Очевидно, что решение этого состоит в том, чтобы использовать Executors.newFixedThreadPool(4).

person John Vint    schedule 10.04.2012
comment
Точно. Задача, представленная базовому исполнителю, который запускает command, освобождает семафор до своего завершения, но это происходит в потоке исполнителя, выполняющем задачу. Это позволяет передать исполнителю другую задачу и передать ее новому потоку, что теоретически позволяет вдвое увеличить количество потоков, чем bound, при правильных условиях. - person David Harkness; 10.04.2012
comment
@John: он ограничивает количество отправленных задач, но непредсказуемым и ненадежным образом. Если потоки запланированы неудачным образом, это может означать, что множество отправляющих потоков могут проникнуть внутрь. Кроме того, в случае newFixedThreadPool() эти задачи могут все еще накапливаться в неограниченной очереди Executor, все еще рискуя нехваткой памяти. . @David: я получил целых 11 потоков с границей 4. Кроме того, я думаю, что забавно, что в справочнике по параллелизму Java все еще есть условия гонки. - person Jan Van den bosch; 10.04.2012
comment
@Bossie Вы правы, вы все еще подозреваете OOM с newFixedThreadPool, но это предотвратит создание лишних потоков. Вы используете привязку, чтобы предотвратить, как я уже упоминал, отправку миллионов задач одновременно. - person John Vint; 10.04.2012
comment
@Bossie - Вы правы, если многие потоки уступают после снятия блокировки и застаиваются, вы можете накопить гораздо больше потоков. - person David Harkness; 10.04.2012