500 рабочих потоков, какой пул потоков?

Мне интересно, лучший ли это способ сделать это. У меня есть около 500 потоков, которые работают бесконечно, но Thread.sleep на минуту, когда выполняется один цикл обработки.

   ExecutorService es = Executors.newFixedThreadPool(list.size()+1);
   for (int i = 0; i < list.size(); i++) {
      es.execute(coreAppVector.elementAt(i)); //coreAppVector is a vector of extends thread objects
   }

Код, который выполняется, очень прост, и в основном это просто

class aThread extends Thread {
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         //Lots of computation every minute
      }
   }
}

Мне нужны отдельные потоки для каждой запущенной задачи, поэтому изменение архитектуры не вариант. Я попытался сделать размер своего threadPool равным Runtime.getRuntime(). AvailableProcessors(), который пытался запустить все 500 потоков, но позволил выполнить только 8 (4xhyperthreading) из них. Другие потоки не сдавались и позволяли другим потокам делать свою очередь. Я попытался ввести wait() и notify(), но все равно не повезло. Если у кого-то есть простой пример или несколько советов, я был бы признателен!

Ну, дизайн, возможно, несовершенен. Потоки реализуют генетическое программирование или GP, тип алгоритма обучения. Каждый поток анализирует передовые тенденции, делает прогнозы. Если поток когда-либо завершается, обучение теряется. Тем не менее, я надеялся, что sleep() позволит мне поделиться некоторыми ресурсами, пока один поток не «обучается».

Итак, фактические требования таковы.

как я могу запланировать задачи, которые сохраняют состояние и запускаются каждые 2 минуты, но контролируют, сколько выполняется одновременно.


person Submerged    schedule 19.05.2010    source источник
comment
Есть ли какая-то конкретная причина, по которой вы не можете выполнять их одновременно последовательно? То есть иметь 8 запущенных одновременно, и собирать результаты и др., когда они закончены. Проблема в том, что это все равно займет столько же времени, если вы сделаете это с 500 потоками или если вы сделаете это с 8 потоками.   -  person John Vint    schedule 19.05.2010
comment
Сколько времени занимает само вычисление? Сколько времени занимает вычисление, если выполняется в пределах 500 потоков?   -  person Romain Hippeau    schedule 19.05.2010
comment
Мне непонятно, что вы хотите сделать. Почему бы вам просто не попробовать запустить все свои потоки? Что-то вроде: for(int i....) { ((Thread)coreAppVector.elementAt(i)).start(); }   -  person Andrea Polci    schedule 19.05.2010
comment
Вычисление занимает около 2 секунд, но с 500 потоками возникает масса блокировок, а это означает, что некоторым потокам может потребоваться до 2 минут, чтобы фактически выполнить свою работу. Сейчас это кошмар, и я хочу сказать, что только 8 должны работать в любой момент времени, и как только они закончат свои 2 секунды, пусть запустятся еще 8 потоков и так далее.   -  person Submerged    schedule 19.05.2010
comment
Андреа: Запуск 500 потоков одновременно — это то, что я делаю сейчас, но похоже, что они борются за системные ресурсы (поскольку у меня только 4 физических ядра). Некоторые потоки могут быть настолько заблокированы, что их завершение занимает до двух минут. Возможно, это просто ограничение объема работы, которую необходимо выполнить, и вычислительной мощности для ее выполнения.   -  person Submerged    schedule 19.05.2010
comment
В частности, почему вы вызываете sleep()?   -  person Justin    schedule 19.05.2010
comment
Реальность здесь такова, что с 500 потоками потребуется больше времени, чем с 8 потоками. Всего с 8 отдельными процессорами будет огромное количество переключений контекста. Это приведет к тому, что каждые 2 секунды вычисления будут занимать гораздо больше времени.   -  person John Vint    schedule 19.05.2010
comment
500 потоков в вашем случае как минимум на 492 слишком много. Поскольку вы используете гиперпоточность, у вас даже нет 8 реальных процессоров. Я бы сказал, что используйте 4 потока в циклической конфигурации, и вы, вероятно, получите лучшую производительность.   -  person Daniel Pryden    schedule 19.05.2010
comment
Я понимаю, что мне следует использовать только 4 потока, но потоки должны выполнять блок кода ровно в минуту, что-то вроде опроса каждую минуту, если вы хотите так думать. Они спят, чтобы поддерживать этот интервал в одну минуту. Я не могу заставить их завершиться, тогда их внутренний цикл не будет запускать этот код каждую минуту... правильно ли я думаю, что, если этот метод run() никогда не завершится, никакие другие потоки не смогут начаться с threadPool размера 4 и 500 потоков ?   -  person Submerged    schedule 19.05.2010
comment
@Submerged - есть и другие способы поддерживать интервал в одну минуту (кстати, у вас нет интервала в одну минуту, у вас есть интервал в одну минуту + время вычисления). Избавьтесь от сна и используйте законный механизм планирования.   -  person erickson    schedule 19.05.2010
comment
@Подводный. Если вы используете ExecutorService, каждый runnable, который работает вечно, никогда не завершится (как вы можете себе представить), и пул потоков никогда не делегирует следующую задачу свободному потоку (потому что его нет).   -  person John Vint    schedule 19.05.2010
comment
Хорошо, как я и предполагал. Проблема действительно в этом. Позвольте мне дать вам представление о том, что делает программа, чтобы представить ее в перспективе. Каждый поток выполняет генетическую обработку, в основном алгоритм обучения для анализа тенденций. Если поток завершается, это обучение также теряется. По этой причине каждый поток никогда не может закончиться, а скорее Sleep() в надежде отказаться от некоторых своих ресурсов... к сожалению, кажется, что это невозможно, поскольку он никогда не завершается.   -  person Submerged    schedule 19.05.2010
comment
@Submereged Вы можете использовать то, что предложил Эриксон, с помощью ScheudledExecutorService, сохранить это состояние в рабочем состоянии, и вы сможете сохранить информацию.   -  person John Vint    schedule 19.05.2010
comment
Но абсолютно ли необходимо, чтобы они были объектами Thread? Вы можете сделать их доступными для выполнения и позволить им сохранить это важное внутреннее состояние, а затем вызывать запуск по мере необходимости в ряде потоков, управляемых для вашей системы. (которых может быть больше 4 или 8, если часть задачи блокируется в сети/базе данных/независимо от ввода-вывода).   -  person Affe    schedule 19.05.2010
comment
Это как раз может быть то, что я ищу. Всем спасибо! Я отчитаюсь завтра, так как сейчас 17:00, и дам вам знать, как я разобрался. Как всегда отличная помощь!!   -  person Submerged    schedule 19.05.2010
comment
Тот факт, что имеется только восемь (логических) ядер, не означает, что наилучшая производительность достигается за счет восьми потоков. Если потоки привязаны к процессору, то восемь будет ближе к правому краю, но если потоки связаны с вводом-выводом, то может иметь смысл иметь гораздо больше потоков. Согласитесь с общей идеей, что вы должны быть осторожны, чтобы контролировать переключение контекста.   -  person    schedule 19.05.2010
comment
@fuzzy lollipop Конечно, runnable может сохранять состояние. Если у вас есть коллекция типа класса, который реализует runnable с некоторым состоянием и выполняет эти runnables, эти runnables будут сохранять состояние при каждом запуске.   -  person John Vint    schedule 20.05.2010
comment
Спасибо, Джон, это то, что я хотел услышать.   -  person Submerged    schedule 20.05.2010
comment
Для тех, кто читает это в будущем. Я реализовал свой принятый ответ, и как только я действительно реализовал Runnable и позволил run() закончить, я увидел увеличение скорости на 50-75%. Всем спасибо   -  person Submerged    schedule 31.05.2010


Ответы (11)


Почему бы не использовать ScheduledExecutorService для запланировать запуск каждой задачи один раз в минуту вместо того, чтобы оставлять все эти потоки бездействующими на целую минуту?

ScheduledExecutorService workers = 
  Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
for (Runnable task : list) { 
  workers.scheduleWithFixedDelay(task, 0, 1, TimeUnit.MINUTES);
}

Что вы подразумеваете под «изменение архитектуры не вариант»? Если вы имеете в виду, что вы не можете изменить свою задачу вообще (в частности, задачи должны выполняться в цикле, а не выполняться один раз, и вызов Thread.sleep() не может быть удален), тогда "хорошая производительность тоже не вариант».

person erickson    schedule 19.05.2010
comment
Ясно, что он не должен использовать потоки, он должен просто реализовать Runnable, а каждый run() должен завершаться после одного вычисления. - person erickson; 19.05.2010
comment
Это было бы лучшим решением, которое ищет OP, если каждый runnable не находится в вечном цикле и выходит из метода запуска после завершения одного запуска. - person John Vint; 19.05.2010

Если ваши потоки не завершаются, это ошибка кода внутри потока, а не пула потоков. Для получения более подробной справки вам необходимо опубликовать исполняемый код.

Кроме того, почему вы усыпляете каждый поток, когда он сделан; не лучше ли просто дать ему завершиться?

Кроме того, я думаю, что вы неправильно используете пул потоков, имея количество потоков, равное количеству задач, которые вы хотите выполнить. Смысл пула потоков в том, чтобы наложить ограничение на количество используемых ресурсов; этот подход не лучше, чем вообще не использовать пул потоков.

Наконец, вам не нужно передавать экземпляры Thread вашему ExecutorService, только экземпляры Runnable. ExecutorService поддерживает свой собственный пул потоков, которые зацикливаются на неопределенный срок, вытягивая работу из внутренней очереди (работа — это Runnable, которые вы отправляете).

person danben    schedule 19.05.2010
comment
Правильный. Исполнитель понятия не имеет, что Runnable спит. Пока Runnable не будет выполнен, исполнитель считает, что он работает. - person Steve Kuo; 19.05.2010

Я не уверен, что ваш код семантически корректен в том, как он использует пул потоков. ExecutionService создает и управляет потоками внутри, клиент должен просто предоставить экземпляр Runnable, чей метод run() будет выполняться в контексте одного из потоков в пуле. Вы можете проверить мой пример. Также обратите внимание, что каждый запущенный поток занимает около 10 МБ системной памяти для стека, а в Linux сопоставление потоков java-to-native выполняется 1-к-1.

person bobah    schedule 19.05.2010

Вместо того, чтобы усыплять протектор, вы должны позволить ему вернуться и использовать ThreadPooleexecutor для выполнения работы, отправляемой каждую минуту в вашу рабочую очередь.

person Eugene Kuleshov    schedule 19.05.2010

Чтобы ответить на ваш вопрос, какой тип пула потоков?

Я разместил свои комментарии, но это действительно должно решить вашу проблему. У вас есть вычисление, которое может занять 2 секунды. У вас много задач (500), которые вы хотите выполнить как можно быстрее. Максимально возможная пропускная способность, которую вы можете достичь, при условии отсутствия операций ввода-вывода и/или сетевого трафика, достигается при Runtime.getRuntime().availableProcessors() числе потоков.

Если вы увеличите свое число до 500 потоков, то каждая задача будет выполняться в своем собственном потоке, но ОС будет время от времени планировать поток, чтобы передать его другому потоку. Это 125 переключений контекста в любой момент. Каждое переключение контекста увеличивает время выполнения каждой задачи.

Общая картина здесь заключается в том, что добавление большего количества потоков НЕ приводит к увеличению пропускной способности, когда количество процессоров превышает количество.

Изменить: Быстрое обновление. Вам не нужно спать здесь. Когда вы выполняете 500 задач с 8 процессорами, каждая задача будет завершена за 2 секунды, завершится, и поток, в котором она выполнялась, затем возьмет следующую задачу и завершит ее.

person John Vint    schedule 19.05.2010

8 потоков - это максимум, с которым может справиться ваша система, и вы замедляете себя с переключением контекста.

Посмотрите эту статью http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4 Это даст вам общее представление о том, как это сделать.

person Romain Hippeau    schedule 19.05.2010
comment
Это предполагает обработку с привязкой к процессору. Неправильно, если потоки привязаны к вводу-выводу. - person ; 19.05.2010
comment
@willie Wheeler - В комментариях к вопросу он говорит, что выполняет вычисления. Вероятно, можно с уверенностью предположить, что он привязан к процессору. Вы читали или просто звоните? - person Romain Hippeau; 19.05.2010

Это должно делать то, что вы хотите, но не то, о чем вы просили :-) Вы должны вынуть Thread.sleep()

ScheduledRunnable.java

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledRunnable
{
    public static void main(final String[] args)
    {
        final int numTasks = 10;
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < numTasks; i++)
        {
            ses.scheduleAtFixedRate(new MyRunnable(i), 0, 10, TimeUnit.SECONDS);
        }
    }

    private static class MyRunnable implements Runnable
    {
        private int id;
        private int numRuns;

        private MyRunnable(final int id)
        {
            this.id = id;
            this.numRuns = 0;
        }

        @Override
        public void run()
        {
            this.numRuns += 1;
            System.out.format("%d - %d\n", this.id, this.numRuns);
        }
    }
}

Это планирует Runnables каждые 10 СЕКУНД, чтобы показать поведение. Если вам действительно нужно подождать определенное время ПОСЛЕ завершения обработки, возможно, вам придется поиграть с тем, какой .scheduleXXX метод вам нужен. Я думаю, что fixedWait будет просто запускать его каждые N раз, независимо от времени выполнения.

person Community    schedule 19.05.2010

Мне нужны отдельные потоки для каждой запущенной задачи, поэтому изменение архитектуры не вариант.

Если это правда (например, вызов внешней блокирующей функции), то создайте для них отдельные потоки и запустите их. Вы не можете создать пул потоков с ограниченным числом потоков, так как блокирующая функция в одном из потоков предотвратит попадание в него любого другого исполняемого объекта и не принесет много пользы при создании пула потоков с одним потоком на задачу.

Я попытался сделать размер своего threadPool равным Runtime.getRuntime(). AvailableProcessors(), который пытался запустить все 500 потоков, но позволил выполнить только 8 (4xhyperthreading) из них.

Когда вы передаете создаваемые вами объекты Thread в пул потоков, он видит только то, что они реализуют Runnable. Поэтому он будет запускать каждый Runnable до завершения. Любой цикл, который останавливает возврат метода run(), не позволит запустить следующую поставленную в очередь задачу; например:

public static void main (String...args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 10; ++i) {
        final int task = i;

        executor.execute(new Runnable () {
        private long lastRunTime = 0;
            @Override
            public void run () {

                for (int iteration = 0; iteration < 4; )
                {
                    if (System.currentTimeMillis() - this.lastRunTime > TIME_OUT)
                    {
                        // do your work here
                        ++iteration;
                        System.out.printf("Task {%d} iteration {%d} thread {%s}.\n", task, iteration, Thread.currentThread());

                        this.lastRunTime = System.currentTimeMillis();
                    }
                    else
                    {
                        Thread.yield(); // otherwise, let other threads run
                    }
                }
            }
        });
    }

    executor.shutdown();
}

распечатывает:

Task {0} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
Task {2} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {4} thread {Thread[pool-1-thread-2,5,main]}.
Task {3} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
...

показывая, что первые задачи (размер пула потоков) выполняются до завершения до того, как будут запланированы следующие задачи.

Что вам нужно сделать, так это создать задачи, которые выполняются какое-то время, а затем позволить другим задачам выполняться. То, как вы их структурируете, зависит от того, чего вы хотите достичь.

  • хотите ли вы, чтобы все задачи выполнялись одновременно, все ждали минуту, затем снова все выполнялись одновременно, или задачи не синхронизируются друг с другом
  • действительно ли вы хотели, чтобы каждая задача выполнялась с интервалом в одну минуту
  • независимо от того, потенциально ли ваши задачи блокируются или нет, и поэтому действительно требуют отдельных потоков
  • какое поведение ожидается, если задача блокируется дольше ожидаемого окна для запуска
  • какое поведение ожидается, если задача блокируется дольше, чем частота повторения (блокируется более одной минуты)

В зависимости от ответов на них для координации задач может использоваться некоторая комбинация ScheduledExecutorService, семафоров или мьютексов. Простейшим случаем являются неблокирующие, несинхронные задачи, и в этом случае используйте ScheduledExecutorService напрямую для запуска ваших runnables каждую минуту.

person Pete Kirkham    schedule 19.05.2010
comment
Как уже было сказано, обратите внимание, что использование yield() приводит к занятому ожиданию, и его следует избегать. - person Andrea Polci; 19.05.2010
comment
Я думал, ты сказал, что это не было занято ожиданием, так что же это? - person ; 20.05.2010

Можете ли вы переписать свой проект для использования какой-либо среды параллелизма на основе агентов, например Akka?

person Igor Artamonov    schedule 19.05.2010

Вы, безусловно, можете улучшить производительность, уменьшив количество потоков до того, с чем система может реально справиться. Готовы ли вы немного изменить дизайн треда? Это избавит планировщик от необходимости помещать спящие потоки в очередь вместо того, чтобы иметь сотни спящих потоков.

class RepeatingWorker implements Runnable {

private ExecutorService executor;
private Date lastRan;

//constructor takes your executor

@Override
public void run() {

  try {
    if (now > lastRan + ONE_MINUTE) {
      //do job
      lastRan = now;
    } else {
      return;
  } finally {
    executor.submit(this);
  }
}
}

Это сохраняет вашу основную семантику «задание повторяется бесконечно, но ждет не менее одной минуты между выполнениями», но теперь вы можете настроить пул потоков на то, что машина может обработать, а те, которые не работают, находятся в очереди, а не слоняются. в планировщике как спящие потоки. Существует некоторое поведение ожидания, если никто на самом деле ничего не делает, но я предполагаю из вашего поста, что вся цель приложения состоит в том, чтобы запускать эти потоки, и в настоящее время оно ограничивает ваши процессоры. Возможно, вам придется настроить это, если нужно освободить место для других вещей :)

person Affe    schedule 19.05.2010

Вам нужен семафор.

class AThread extends Thread {
   Semaphore sem;
   AThread(Semaphore sem) {
     this.sem = sem;
   }
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         sem.acquire();
         try {
           //Lots of computation every minute
         } finally {
           sem.release();
         }
      }
   }
}

При создании экземпляра AThreads вам необходимо передать один и тот же экземпляр семафора:

Semaphore sem = new Semaphore(MAX_AVAILABLE, true);

Редактировать: кто проголосовал против, пожалуйста, объясните, почему? Что-то не так в моем решении?

person Andrea Polci    schedule 19.05.2010
comment
ОП ничего не упомянул о синхронизации. Это совершенно не по теме. - person SimonC; 26.02.2011
comment
ОП спросила, как обеспечить, чтобы в данный момент было активно только фиксированное количество потоков, и это проблема синхронизации. Может быть, мое решение не является оптимальным (мне нравится принятое решение), наверняка оно не не по теме, потому что решает проблему. - person Andrea Polci; 20.04.2011