Есть ли способ вернуть задачи в очередь исполнителя

У меня есть ряд задач (т.е. Runnables), которые должен выполнить Executor.
Каждая задача требует выполнения определенного условия для продолжения. Мне было бы интересно узнать, есть ли способ каким-то образом настроить Executor для перемещения задач в конец очереди и попытки выполнить их позже, когда условие будет действительным и задача сможет выполняться и завершаться.
Итак, поведение будет примерно таким:

  1. Thread-1 взять задачи из очереди и run вызвать
  2. Внутри run условие еще не выполняется
  3. Задача останавливается, и Thread-1 помещает задачу в конец очереди и получает следующую задачу для выполнения
  4. Позже Thread-X (из пула потоков) снова выбирает задачу из очереди, условие допустимо, и задача выполняется

person Cratylus    schedule 20.11.2012    source источник
comment
Просто создайте новую задачу, равную текущей, и поставьте ее в очередь. Завершить текущую задачу   -  person hoaz    schedule 21.11.2012
comment
@hoaz: Вы имеете в виду ссылку на исполнителя внутри задач?   -  person Cratylus    schedule 21.11.2012
comment
@Cratylus - да, если у задачи есть ссылка на исполнителя, она может повторно поставить себя в очередь. просто убедитесь, что очередь ожидающих выполнения задач исполнителя не ограничена, иначе вы можете заблокировать себя.   -  person jtahlborn    schedule 21.11.2012


Ответы (3)


В Java 6 конструктор ThreadPoolExecutor принимает BlockingQueue<Runnable>, который используется для хранения задач в очереди. Вы можете реализовать такую ​​блокирующую очередь, которая переопределяет poll(), чтобы при попытке удалить и выполнить «готовое» задание poll продолжалось как обычно. В противном случае исполняемый объект помещается в конец очереди, и вы пытаетесь снова выполнить опрос, возможно, после короткого тайм-аута.

person mbatchkarov    schedule 20.11.2012
comment
Разве это не зависит от деталей реализации? Как я могу быть уверен, что реализация будет использовать poll. Я не хочу, чтобы она основывалась, например, на Oracle исходном коде - person Cratylus; 21.11.2012
comment
Я не знаю, использует ли он poll наверняка. Я знаю, что он будет использовать один из remove(), poll(), take() или poll(time, unit), потому что это методы удаления из очереди блокировки в соответствии с документацией. - person mbatchkarov; 21.11.2012
comment
Кроме того, хотя ваше предложение привлекательно (+1), я не вижу, как оно здесь поможет. Проверяемое условие не является каким-то глобальным условием, оно является частью состояния Runnable во время run - person Cratylus; 21.11.2012
comment
Вопрос ничего не говорит о состоянии :) - person mbatchkarov; 21.11.2012
comment
Это не состояние гонки. Это логическое состояние (if(...)). - person Cratylus; 21.11.2012
comment
Я имел в виду, что вы не указали, является ли условие локальным для Runnable или более глобальным. - person mbatchkarov; 21.11.2012
comment
Это локально для Runnable - person Cratylus; 22.11.2012

Если вам не нужно занятое ожидание, вы можете добавить повторяющуюся задачу в ScheduledExecutorService с соответствующим интервалом опроса, которую вы отменяете или убиваете после того, как она «действительна» для запуска.

ScheduleExecutorService ses = ...

ses.scheduleAtFixedRate(new Runnable() {
    public void run() {
        if (!isValid()) return;
        preformTask();
        throw new RuntimeException("Last run");
    }
}, PERIOD, PERIOD, TimeUnit.MILLI_SECONDS);
person Peter Lawrey    schedule 20.11.2012
comment
Для чего нужен RuntimeException? - person Cratylus; 21.11.2012
comment
Это самый простой способ остановить повторение задачи. Есть альтернативы, но они не менее уродливы и более сложны. - person Peter Lawrey; 21.11.2012
comment
Исключение не повлияет на следующие задачи в очереди, верно? - person Cratylus; 21.11.2012
comment
ExecutorService предназначен для захвата исключения или ошибки и сохранения в возвращаемом объекте Future. (отброшено в этом примере) - person Peter Lawrey; 21.11.2012
comment
Мне нравится это решение, оно намного чище моего, но как отличить реальные ошибки времени выполнения от сигнальных? - person didierc; 22.11.2012
comment
@PeterLawrey: Вызовет ли выполнение: ses.scheduleAtFixedRate(this, PERIOD, PERIOD, TimeUnit.MILLI_SECONDS); внутри запланированного исполняемого файла какие-либо проблемы? - person Cratylus; 23.11.2012
comment
Каждый раз, когда вы вызываете это, вы добавляете новую повторяющуюся работу. Вы не хотите создавать экспоненциальное увеличение количества выполняемых заданий, поэтому я не думаю, что это было бы очень полезно. - person Peter Lawrey; 23.11.2012

Сначала создайте исполнителя.

У вас есть несколько возможностей.

Если я предполагаю, что ваши задачи реализуют простой интерфейс для запроса их статуса (что-то вроде перечисления с «NeedReschedule» или «Completed»), тогда реализуйте оболочку (реализующую Runnable) для ваших задач, которая будет принимать задачу и исполнителя в качестве экземпляра параметры. Эта оболочка запустит задачу, к которой она привязана, затем проверит ее статус и, если необходимо, перепланирует свою копию в исполнителе перед завершением.

В качестве альтернативы вы можете использовать механизм выполнения, чтобы сообщить оболочке, что задача должна быть перепланирована. Это решение проще в том смысле, что оно не требует определенного интерфейса для вашей задачи, так что простой Runnable можно без проблем бросить в систему. Однако исключения требуют больше времени вычислений (построение объекта, трассировка стека и т. д.).

Вот возможная реализация оболочки с использованием механизма сигнализации исключений. Вам нужно реализовать класс RescheduleException, расширяющий Throwable, который может быть запущен обернутым исполняемым объектом (нет необходимости в более конкретном интерфейсе для задачи в этой настройке). Вы также можете использовать простой RuntimeException, как предложено в другом ответе, но вам нужно будет проверить строку сообщения, чтобы узнать, является ли это исключением, которого вы ждете.

 public class TaskWrapper implements Runnable {

    private final ExecutorService executor;
    private final Runnable task;       

    public TaskWrapper(ExecutorService e, Runnable t){
         executor = e;
         task = t;
    }

@Override
public void run() {

    try {
               task.run();
    } 
    catch (RescheduleException e) {
        executor.execute(this);
    }
}

Вот очень простое приложение, запускающее 200 упакованных задач, случайным образом запрашивающих перенос.

class Task implements Runnable {

  @Override
  public void run(){
     if (Maths.random() > 0.5)
      throw new RescheduleException();
   }     
}


public class Main {

public static void main(String[] args){

    ExecutorService executor = Executors.newFixedThreadPool(10);

    int i = 200;
            while(i--)
       executor.execute(new TaskWrapper(executor, new Task());
}
}

У вас также может быть выделенный поток для отслеживания результатов других потоков (с использованием очереди сообщений) и перепланирования при необходимости, но вы теряете один поток по сравнению с другим решением.

person didierc    schedule 20.11.2012
comment
Не могли бы вы подробнее рассказать о ваших предложениях? - person Cratylus; 21.11.2012
comment
This wrapper will run the task it is bound to, check its status afterwards.Как?Используя Callables? - person Cratylus; 21.11.2012
comment
Я придумал механизм исключений, когда писал код, кажется чище. - person didierc; 21.11.2012
comment
Параметр-обертка может быть даже простым Executor в моей реализации. - person didierc; 21.11.2012
comment
RescheduleException мой класс? Нет ли более подходящего существующего исключения? - person Cratylus; 22.11.2012
comment
Кроме того, как я это вижу, с кодом, который вы разместили, перепланированная задача будет/может быть выполнена немедленно, как только вы переназначите повторную отправку. Было бы лучше отложить выполнение. - person Cratylus; 22.11.2012
comment
действительно, небольшая задержка может быть желательной. Обратите внимание, что вы не определяете обстоятельства изменения расписания, и, возможно, было бы лучше, чтобы некоторые задачи были приостановлены и определены ожидаемые события, которые вызовут активацию задачи. - person didierc; 22.11.2012