Как получить доступ к базовой очереди ThreadpoolExecutor потокобезопасным способом

Метод getQueue() обеспечивает доступ к базовой очереди блокировки в ThreadPoolExecutor, но это не кажется безопасным.

Обход очереди, возвращаемой этой функцией, может пропустить обновления, внесенные в очередь ThreadPoolExecutor.

«Метод getQueue() позволяет получить доступ к рабочей очереди в целях мониторинга и отладки. Использование этого метода для любых других целей настоятельно не рекомендуется».

Что бы вы сделали, если бы захотели пройти через workQueue, используемую ThreadPoolExecutor? Или есть альтернативный подход?

Это продолжение... Выбор данных структура варианта задачи производителя-потребителя

Теперь я пробую несколько потребителей с несколькими производителями, но я хочу использовать какой-то существующий пул потоков, так как я не хочу сам управлять пулом потоков, а также мне нужен обратный вызов, когда ThreadPoolExecutor завершил выполнение некоторой задачи вместе с возможностью проверки потокобезопасным способом структуру данных "незавершенных транзакций".


person Abhijeet Kashnia    schedule 09.05.2011    source источник


Ответы (3)


Вы можете переопределить методы beforeExecute и afterExecute, чтобы сообщить, что задача запущена и завершена. Вы можете переопределить execute(), чтобы узнать, когда добавляется задача.

Ваша проблема заключается в том, что очередь не предназначена для запросов, и задача может быть использована до того, как вы ее увидите. Один из способов обойти это - создать собственную реализацию очереди (возможно, переопределив/обернув ConcurrentLinkedQueue)

Кстати: очередь потокобезопасна, однако не гарантируется, что вы увидите каждую запись.

ConcurrentLinkedQueue.iterator() задокументирован как

Возвращает итератор по элементам в этой очереди в правильной последовательности. Возвращенный итератор является «слабо согласованным» итератором, который никогда не вызовет исключение ConcurrentModificationException и гарантирует обход элементов в том виде, в каком они существовали при построении итератора, и может (но не обязательно) отражать любые модификации после построения.

person Peter Lawrey    schedule 09.05.2011
comment
Я могу сделать снимок в этот момент времени, например Collections.unmodifiable(), но мне нужно знать, что пока я делаю этот снимок, очередь не изменится. Итак, мне нужно что-то заблокировать в очереди, но ThreadPoolExecutor не будет соблюдать мои ограничения блокировки. Вы предлагаете мне создать свою собственную CustomBlockingqueue‹Runnable›, которую я передам ThreadPoolExecutor? Я не понимаю, как это поможет, пожалуйста, уточните. - person Abhijeet Kashnia; 09.05.2011
comment
Можно ли рассматривать Collections.unmodified() как атомарную операцию? Потому что я думаю, что только атомарные операции будут потокобезопасными. - person Abhijeet Kashnia; 09.05.2011
comment
У вас есть проблема в том, что ConcurrentLinkedQueue предназначен для параллельной/беззамковой организации очереди. ThreadPoolExecutor не имеет права голоса в этом вопросе. Вы можете дать ему очередь, которая ведет себя так, как вы хотите. - person Peter Lawrey; 09.05.2011
comment
Вы не можете использовать Collections.unmodifiableXxxx() с очередями. Он только обертывает базовые коллекции и не копирует их. Атомарные операции легче сделать потокобезопасными, однако вы можете сделать итератор потокобезопасным, предварительно заблокировав коллекцию. (при условии, что коллекция соблюдает замки) - person Peter Lawrey; 09.05.2011
comment
Не могли бы вы также расширить утверждение Очередь является потокобезопасной, однако не гарантируется, что вы увидите каждую запись. Я думаю, что безопасность потоков подразумевает, что каждая запись будет видна. - person Abhijeet Kashnia; 10.05.2011
comment
Потокобезопасность означает, что он гарантированно будет вести себя как задокументировано при изменении несколькими потоками. Коллекции, которые не являются потокобезопасными, не гарантируют, что они будут вести себя как задокументировано при изменении несколькими потоками. (Они даже не гарантируют, что вы увидите исключение ConcurrentModificationException). Проблема в том, что вы хотите, чтобы оно вело себя иначе, чем задокументировано. Вы не можете сделать это, не изменив поведение очереди. - person Peter Lawrey; 10.05.2011

Если вы хотите скопировать элементы в очереди и убедиться, что то, что у вас есть в очереди, не было выполнено, вы можете попробовать это:

а) Ввести возможность приостанавливать и возобновлять выполнение. См.: http://download.oracle.com/javase/1,5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html

б) сначала приостановить очередь, затем скопировать очередь, затем возобновить очередь.

И тогда у меня есть свой вопрос. Проблема, которую я вижу, заключается в том, что пока вы выполняете свой «Runnable», этот «Runnable» не помещается в очередь, а «обертка» FutureTask, и я не могу найти никакого способа определить, какой именно из моих runnables я ищу в. Таким образом, захват и изучение очереди довольно бесполезны. Кто-нибудь знает, что я там пропустил?

person Paul G    schedule 19.05.2011
comment
Спасибо, Пол, последний абзац вашего ответа имеет смысл, так как я понял, что мне нужно поддерживать мои runnables в отдельной структуре данных. - person Abhijeet Kashnia; 06.07.2011

Если вы следуете совету Джона Скита в своей принятый ответ на ваш предыдущий вопрос, тогда вы будете контролировать доступ к своим очередям с помощью блокировок. Если вы заблокируете незавершенную очередь, вы можете гарантировать, что обход не пропустит ни одного элемента в ней.

Проблема с этим, конечно, заключается в том, что пока вы выполняете обход, все другие операции в очереди (другие производители и потребители, пытающиеся получить к ней доступ) будут блокироваться, что может иметь довольно сильное влияние на производительность.

person Qwerky    schedule 09.05.2011
comment
Спасибо за ответ. Да, я пытаюсь следовать его совету с дополнительным использованием threadExecutor для обработки управления пулом потоков для меня. Я думаю, что мог бы сделать это самостоятельно без ThreadPoolExecutor, но тогда мне придется написать дополнительный код для обработки пула потоков и обратных вызовов по завершении задачи, чего я пытаюсь избежать. - person Abhijeet Kashnia; 09.05.2011