Java Executors: как установить приоритет задачи?

Есть ли возможность установить приоритет задач, выполняемых Исполнителями? Я нашел несколько утверждений в JCIP о том, что это возможно, но я не могу найти ни одного примера и не могу найти ничего, связанного с этим, в документации.

Из JCIP:

Политика выполнения определяет «что, где, когда и как» выполнения задачи, в том числе:

  • ...
  • В каком порядке должны выполняться задачи (FIFO, LIFO, порядок приоритета)?
  • ...

UPD: я понял, что спросил не совсем то, что хотел спросить. На самом деле я хотел:

Как использовать / имитировать установку приоритета потоков (т.е. что было thread.setPriority()) с фреймворком исполнителей?


person Roman    schedule 07.07.2010    source источник


Ответы (8)


В настоящее время единственные конкретные реализации интерфейса Executor - это ThreadPoolExecutor и ScheduledThreadpoolExecutor

Вместо использования служебного / фабричного класса Executors, вы должны создать экземпляр с помощью конструктора.

Вы можете передать BlockingQueue в конструкторы ThreadPoolExecutor.

Одна из реализаций BlockingQueue, PriorityBlockingQueue позволяет передать компаратор конструктору, таким образом позволяя вам определять порядок выполнения.

person Davy Meers    schedule 07.07.2010
comment
+1 PriorityBlockingQueue - это то, что вам нужно. Вы можете реализовать Компаратор или сделать сами задачи Сопоставимыми. - person Tim Bender; 08.07.2010
comment
Эта статья является отличным справочником: binkley.blogspot.fr /2009/04/jumping-work-queue-in-executor.html - person Snicolas; 04.06.2013
comment
Мое решение упорядочивает задачи по приоритету, но сохраняет порядок отправки с теми же уровнями приоритета: stackoverflow.com/a/42831172/1386911 - person Daniel Hári; 16.03.2017

Идея здесь состоит в том, чтобы использовать PriorityBlockingQueue в исполнителе. Для этого:

  • Создайте компаратор, который сравнивал бы наши фьючерсы.
  • Создайте прокси для будущего, чтобы иметь приоритет.
  • Переопределите newTaskFor, чтобы обернуть каждое будущее в нашем прокси.

Во-первых, вам нужно иметь в приоритете свое будущее:

    class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }
}

Затем вам нужно определить компаратор, который будет правильно сортировать приоритетные фьючерсы:

class PriorityFutureComparator implements Comparator<Runnable> {
    public int compare(Runnable o1, Runnable o2) {
        if (o1 == null && o2 == null)
            return 0;
        else if (o1 == null)
            return -1;
        else if (o2 == null)
            return 1;
        else {
            int p1 = ((PriorityFuture<?>) o1).getPriority();
            int p2 = ((PriorityFuture<?>) o2).getPriority();

            return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
        }
    }
}

Теперь предположим, что у нас есть такая длительная работа:

class LenthyJob implements Callable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

Тогда для приоритетного выполнения этих заданий код будет выглядеть так:

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThreads = 2;
        int qInitialSize = 10;

        ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
                return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
            }
        };

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Это большой объем кода, но это почти единственный способ сделать это.

На моей машине результат выглядит следующим образом:

Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97
person Stanislav Vitvitskyy    schedule 16.05.2013
comment
Хотя принятый ответ действительно отвечает на вопрос, он предлагает рабочее решение. Большое спасибо. - person m02ph3u5; 06.10.2015
comment
Спасибо за Ваш ответ. Можно ли использовать этот подход с ExecutorCompletionService? Я попытался передать ваш объект ExecutorService в конструктор ExecutorCompletionService, но результат не может быть передан в PriorityFuture в компараторе. - person Arash; 28.11.2015
comment
Тестировал на своей машине. Это не так. На моей машине я получил «Выполнение 72 перед выполнением 3», что явно неверно. - person Farhan Shirgill Ansari; 03.11.2016
comment
Самое чистое решение, которое я мог найти для вызываемых вакансий, спасибо. - person Raghu; 27.09.2017
comment
Привет, я столкнулся с ошибкой приведения типа при использовании приведенного выше кода .... int p1 = ((PriorityFuture<?>) o1).getPriority(); java.util.concurrent.FutureTask не может быть преобразован в com.i2c.loyalty.handlers.datasynchandler.utils.InstancePriorityFuture это InstancePriorityFuture реализует RunnableFuture может кто-то помочь пожалуйста? - person Adnan Amman Ullah; 01.10.2019

Вы можете реализовать свой собственный ThreadFactory и установить его в ThreadPoolExecutor следующим образом:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));

где мой OpJobThreadFactory выглядит следующим образом:

public final static class OpJobThreadFactory implements ThreadFactory {
   private int priority;
   private boolean daemon;
   private final String namePrefix;
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
   private final AtomicInteger threadNumber = new AtomicInteger(1);

   public OpJobThreadFactory(int priority) {
      this(priority, true);
   }

   public OpJobThreadFactory(int priority, boolean daemon) {
      this.priority = priority;
      this.daemon = daemon;
      namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
   }

   @Override
   public Thread newThread(Runnable r) {
      Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
      t.setDaemon(daemon);
      t.setPriority(priority);
      return t;
   }
}
person dfreis    schedule 18.09.2013

вы можете использовать ThreadPoolExecutor с очередью блокировки приоритета Как сделать реализовать PriorityBlockingQueue с ThreadPoolExecutor и настраиваемыми задачами

person dupdup    schedule 22.04.2011

Вы можете указать ThreadFactory в ThreadPoolExecutor конструктор (или Executors заводской метод). Это позволяет вам предоставлять потоки с заданным приоритетом потока для исполнителя.

Чтобы получить разные приоритеты потоков для разных заданий, вам нужно отправить их исполнителям с разными фабриками потоков.

person Michael Brewer-Davis    schedule 11.01.2013

Имейте в виду, что setPriority (..) обычно не работает под Linux. См. Следующие ссылки для получения полной информации:

person aanno    schedule 25.06.2012
comment
Комментарии - это комментарии; ответы есть ответы. Комментарии - это не ответы. Ответы - это не комментарии. Если он не отвечает на заданный вопрос, это, по сути, комментарий. - person Engineer; 30.09.2012
comment
+1 @Nick - Ха-ха, в восторге! Зачем использовать одно слово, если можно использовать длинный и язвительный комментарий. Хорошая мысль и хорошо (нахально) сделана. - person TedTrippin; 19.04.2013

Просто хочу добавить свой вклад в это обсуждение. Я реализовал это ReorderingThreadPoolExecutor для очень конкретной цели, которая может явным образом выводить на передний план BlockingQueue исполнителя (в данном случае LinkedBlockingDeque), когда я хочу, и без необходимости иметь дело с приоритетами (что могут привести к тупикам и в любом случае исправлены).

Я использую это для управления (внутри приложения Android) случаем, когда мне нужно загружать много изображений, которые отображаются в виде длинного списка. Всякий раз, когда пользователь быстро прокручивает страницу вниз, очередь исполнителя заполняется запросами на загрузку изображений: перемещая последние из них в начало очереди, я добился гораздо лучших результатов при загрузке изображений, которые фактически находятся на экране, задерживая загрузку те, которые, вероятно, понадобятся позже. Обратите внимание, что я использую внутренний параллельный ключ карты (который может быть таким же простым, как строка URL-адреса изображения), чтобы добавить задачи к исполнителю, чтобы я мог получить их позже для переупорядочения.

Было бы много других способов сделать то же самое, и, возможно, это слишком сложно, но он работает нормально, а также Facebook в своем Android SDK делает что-то подобное в своей собственной очереди рабочих потоков.

Не стесняйтесь взглянуть на код и дать мне предложения, он находится внутри проекта Android, но удаление нескольких журналов и аннотаций сделает класс чистым Java 6.

person fast3r    schedule 27.06.2013
comment
Спасибо, что заметили этого @Robert, я обновил ссылку на правильный URL - person fast3r; 19.01.2020

Если это просто попытка отдать предпочтение одному потоку над другим, а не гарантировать какой-либо порядок. В Callable вы передаете установленный приоритет потока в начале метода call ():

private int priority;

MyCallable(int priority){
this.priority=priority;
}

public String call() {

     logger.info("running callable with priority {}", priority);
     Thread.currentThread().setPriority(priority);

// do stuff

     return "something";
}

все еще зависит от базовой реализации, чтобы соблюдать приоритет потока, хотя

person dezzer10    schedule 16.03.2021