Хотите, чтобы ThreadPoolExecutor мгновенно выполнял задачи

У меня есть ThreadPoolExecutor с одним потоком, который будет использоваться для пакетной обработки. Поэтому, прежде чем назначать новую задачу исполнителю, мне нужно дождаться завершения предыдущей задачи, я делал это в зависимости от значения для активных заданий, но видя подробно я обнаружил, что исполнитель не выполняет задачу мгновенно.

Проблема, которую это вызывает у меня, заключается в том, что я готов дать следующую партию, но первая задача еще не запущена, поэтому значение активных заданий равно 0.

Как я могу запустить задачу мгновенно. Я также согласен с любым другим исполнителем или способом, которым это можно сделать.


person user996808    schedule 02.07.2014    source источник
comment
Вы получаете Future назад, когда отправляете() задачи исполнителю пула потоков. Сохраните это возвращаемое значение, и вы можете просто подождать этого Future, чтобы убедиться, что ваши предыдущие задачи завершены. Однако есть ли какая-либо особая причина, по которой вы должны ждать завершения предыдущих задач, прежде чем отправлять новые пакетные задания?   -  person nos    schedule 02.07.2014
comment
Ну, если у вас есть пул с 1 потоком, и вам нужно подождать, прежде чем отправлять следующую задачу, то вам не нужен никакой поток...   -  person Jean Logeart    schedule 02.07.2014
comment
Я действительно не понимаю, чего вы пытаетесь достичь: если вы хотите дождаться завершения предыдущей задачи, зачем вам использовать исполнителя?   -  person Pierre Rust    schedule 02.07.2014
comment
Сценарий заключается в том, что у меня есть много данных, которые будут обработаны, а затем записаны в файл, поэтому все это будет выполняться партиями. Поэтому, как только я обработал фиксированный объем данных, он передается потоку записи, который является исполнителем, и продолжает обрабатывать больше данных, но прежде чем я передам следующий пакет писателю, я должен дождаться более раннего задания, потому что они могут писать в тот же файл.   -  person user996808    schedule 02.07.2014
comment
@user996808 user996808, если у вас есть ThreadPoolExecutor только с 1 потоком, ваши задания будут выполняться в том порядке, в котором вы отправляете ему задания, по одному заданию за раз. И, естественно, исполнитель ставит в очередь задания, если оно не завершено с текущим заданием, поэтому в таком случае вам не нужно ждать более ранних заданий, прежде чем отправлять новое задание.   -  person nos    schedule 02.07.2014
comment
Да, конечно, но сложность в том, что мой основной поток решил, когда отключить исполнителя, но я вижу, что, поскольку выполнение происходит когда-то в будущем, я не могу правильно ждать, используя avtiveCount, поэтому я закрываю исполнителя но все же некоторые из задач, которые я отправил, никогда не планировались.   -  person user996808    schedule 03.07.2014


Ответы (3)


Вероятно, вам следует использовать метод submit из ExecutorService для планирования задач. Вот рабочая программа, которая использует однопоточный исполнитель для запуска 10 задач. Я перешел к ThreadPoolExecutor, чтобы отслеживать состояние пула потоков. Вы можете дождаться одной задачи, вызвав get для соответствующего экземпляра Future, или дождаться всех задач, вызвав awaitTermination. Если вам не нужен результат Future, просто используйте Void. Надеюсь, поможет.

public class Main {                                                                                                                             
    static class TimingCallable implements Callable<Long> {                                                                                     
        static int MIN_WAIT = 200;                                                                                                              
        @Override                                                                                                                               
        public Long call() {                                                                                                                    
            long start = System.currentTimeMillis();                                                                                            
            try {                                                                                                                               
                Thread.sleep(MIN_WAIT + new Random().nextInt(300));                                                                             
            } catch (InterruptedException e) {                                                                                                  
                //DO NOTHING                                                                                                                    
            }                                                                                                                                   
            return System.currentTimeMillis() - start;                                                                                          
        }                                                                                                                                       
    }                                                                                                                                           

    public static void main(String[] args) throws InterruptedException, ExecutionException {                                                    

        ExecutorService executor =  Executors.newFixedThreadPool(1);                                                                            
        @SuppressWarnings("unchecked")                                                                                                          
        Future<Long>[] futureResults = new Future[10];                                                                                          
        for(int i =0; i < futureResults.length; i++) {                                                                                          
            futureResults[i] = executor.submit(new TimingCallable());                                                                           
            System.out.println(String.format("ActiveCount after submitting %d tasks: ", i+1) + ((ThreadPoolExecutor)executor).getActiveCount());
            System.out.println(String.format("Queue size after submitting %d tasks: ", i+1) + ((ThreadPoolExecutor)executor).getQueue().size());
        }                                                                                                                                       
        Thread.sleep(2000);                                                                                                                     
        System.out.println("ActiveCount after 2 seconds: " + ((ThreadPoolExecutor)executor).getActiveCount());                                  
        System.out.println("Queue size after 2 seconds: " + ((ThreadPoolExecutor)executor).getQueue().size());                                  
        for(int i =0; i < futureResults.length; i++) {                                                                                          
            if (futureResults[i].isDone()) {                                                                                                    
                System.out.println(String.format("%d task is done with execution time: ", i) + futureResults[i].get());                         
            }                                                                                                                                   
        }                                                                                                               //Waiting for the last task to finish
        System.out.println("Waiting for the last task result: " + futureResults[9].get());
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);                                  
    }                                                                                                                                           
}                                                                                                                                               
person zbig    schedule 02.07.2014

Если у вас есть только один поток для выполнения, просто используйте LinkedQueue для хранения заданий, как только поток завершит выполнение, тогда только он выберет другую задачу.

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());

Также у вас могут быть разные стратегии, если вы ограничиваете размер

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

Чтение отклоненных задач

person Abhishek    schedule 02.07.2014
comment
Но как я могу дождаться завершения первой задачи - person user996808; 02.07.2014

Служба исполнителя пула одного потока

По-видимому, вы хотите сразу запустить несколько задач, но в том порядке, в котором они были отправлены.

Довольно просто: используйте ссылку служба-исполнитель, поддерживаемая одним потоком. Исполнитель буферизует задачи, ожидая завершения более ранних. В одном потоке в пуле потоков одновременно может выполняться только одна задача, поэтому они будут выполняться последовательно в том порядке, в котором они были отправлены.

Executors предоставляет выбор из нескольких различных пулов потоков, поддерживающих службу-исполнитель. Вы хотите Executors.newSingleThreadExecutor().

ExecutorService es = Executors.newSingleThreadExecutor() ;

Отправьте серию Runnable или Callable объектов. Каждый представляет задачу, которую нужно выполнить.

es.submit(  ( ) -> System.out.println( "Hello. " + Instant.now() )  ) ;
es.submit(  ( ) -> System.out.println( "Bonjour. " + Instant.now() )  ) ;
es.submit(  ( ) -> System.out.println( "Aloha. " + Instant.now() )  ) ;
es.submit(  ( ) -> System.out.println( "Ciào. " + Instant.now() )  ) ;
es.submit(  ( ) -> System.out.println( "Shwmai. " + Instant.now() )  ) ;

При желании вы можете захватить Future, возвращаемый каждым вызовом submit, если вы хотите отслеживать выполнение задач. (не показано в коде выше)

См. этот выполнение кода на IdeOne.com.

Привет. 2019-11-29T09:10:13.426987Z

Бонжур. 2019-11-29T09:10:13.472719Z

Алоха. 2019-11-29T09:10:13.473177Z

Ciào. 2019-11-29T09:10:13.473479Z

Швмай. 2019-11-29T09:10:13.473974Z

person Basil Bourque    schedule 29.11.2019