Потоки парковки в сервисе

Я экспериментирую с парковкой потоков и решил создать какой-то сервис. Вот как это выглядит:

public class TestService {
    private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles

    private final CountDownLatch stopLatch;
    private final Object parkBlocker = new Object();
    private volatile boolean stopped;
    private final Thread[] workers;

    public TestService(int parallelizm) {
        stopLatch = new CountDownLatch(parallelizm);
        workers = new Thread[parallelizm];
        for (int i = 0; i < parallelizm; i++) {
            workers[i] = new Thread(() -> {
                try {
                    while (!stopped) {
                        logger.debug("Parking " + Thread.currentThread().getName());
                        LockSupport.park(parkBlocker);
                        logger.debug(Thread.currentThread().getName() + " unparked");
                    }
                } finally {
                    stopLatch.countDown();
                }
            });
        }
    }

    public void start() {
        Arrays.stream(workers).forEach(t -> {
            t.start();
            logger.debug(t.getName() + " started");
        });
    }

    public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
        boolean stoppedSuccefully = false;
        this.stopped = true;
        unparkWorkers();
        if (stopLatch.await(timeout, unit)) {
            stoppedSuccefully = true;
        }
        return stoppedSuccefully;
    }

    private void unparkWorkers() {
        Arrays.stream(workers).forEach(w -> {
            LockSupport.unpark(w);
            logger.debug("Un-park call is done on " + w.getName());
        });
    }
}

Проблема, с которой я столкнулся, заключалась в том, что если я затем протестирую эту службу следующим образом:

public static void main(String[] args) = {
  while(true) {
    TestService service = new TestService(2);
    service.start();
    if (!service.stop(10000, TimeUnit.MILLISECONDS))
      throw new RuntimeException();
  }
}

Иногда у меня было следующее поведение:

14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
    at com.pack.age.Test$.main(Test.scala:12)
    at com.pack.age.Test.main(Test.scala)

Тема висит на парковке:

"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000720739a68> (a java.lang.Object)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at com.pack.age.TestService.lambda$new$0(TestService.java:27)
    at com.pack.age.TestService$$Lambda$1/1327763628.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)

Я не вижу никакой гонки в парковке-разпарковке в сервисе. Более того, если unpark вызывается перед park, park гарантированно не блокируется (это то, что говорят javadocs).

Может быть, я неправильно использую LockSupport::park. Можете ли вы предложить какое-либо исправление?


person St.Antario    schedule 16.05.2018    source источник
comment
Кажется, я не получаю этого исключения; Я запускаю ваш код уже 3 минуты...   -  person daniu    schedule 16.05.2018
comment
@daniu Вы использовали log4j/logback?   -  person St.Antario    schedule 16.05.2018
comment
нет, на самом деле, java.util.logging (здесь нет log4j). Может быть, это действительно проблема (хотя я не понимаю, как).   -  person daniu    schedule 16.05.2018
comment
@daniu На самом деле я сначала попытался просто System.out.println все сообщения, и код работал нормально. После перехода на log4j/logback я начал решать эту проблему.   -  person St.Antario    schedule 16.05.2018
comment
@daniu Кажется, это из-за logback. Он использует ReentrantLock для синхронизации, которая, в свою очередь, использует поток парковки для ожидания (через AbstractQueuedSynchronizer). Таким образом, LockSupport::park кажется небезопасным для использования с каким-то чужеродным кодом, который может сам парковать / разблокировать потоки.   -  person St.Antario    schedule 16.05.2018
comment
Это странно, и если это связано с использованием ReentrantLock, это проблема с этим, а не с логбэком (который только вводит проблему в вашем случае) - так что это реальная проблема JDK. В конце концов, вы можете использовать ReentrantLock в других (не чужих) частях кода и вызывать такое же поведение.   -  person daniu    schedule 16.05.2018
comment
@daniu Меня смущает то, что unpark не принимает объект blocker. Просто нить.   -  person St.Antario    schedule 16.05.2018
comment
Ну, вы не сможете unpark() использовать блокирующий объект, если используете тот же самый (что и делаете вы). Но в целом я согласен с тем, что unpark() принимает Thread, тем более что предпочтительная обработка параллелизма в Java использует ExecutorServices и больше не предполагает прямого использования самого Thread. Это, вероятно, также способ решить вашу проблему.   -  person daniu    schedule 16.05.2018
comment
@daniu Я могу представить сценарий, когда мы распаковываем рабочего, чтобы получить разрешение. Затем этот рабочий блокирует ReentrantLock, поэтому разрешение теряется. И в следующий раз, когда мы припаркуем поток, он будет припаркован (разрешение было потеряно в ReentrantLock). Это правильно?   -  person St.Antario    schedule 16.05.2018
comment
@daniu Воспроизвел поведение, заменив вызовы журнала на lock.lock(); long l = 0; while(l++ < 100000000L){} lock.unlock();   -  person St.Antario    schedule 16.05.2018
comment
В LockSupport javadoc указываются базовые примитивы блокировки потоков для создания блокировок и других классов синхронизации. Я предполагаю, что урок заключается в том, чтобы не использовать его вместе с блокировками (а лучше оставаться на том же уровне синхронизации).   -  person daniu    schedule 16.05.2018
comment
@daniu Звучит разумно, спасибо.   -  person St.Antario    schedule 16.05.2018
comment
@daniu, я думаю, это немного проще.   -  person Eugene    schedule 11.04.2020


Ответы (1)


Это не имеет ничего общего с регистратором, хотя его использование выводит проблему на поверхность. У вас есть состояние гонки, вот и все. Прежде чем объяснять это состояние гонки, вам нужно сначала понять несколько вещей из LockSupport::unpark документации:

Делает доступным разрешение для данного потока, если оно еще не было доступно. Если поток был заблокирован при парковке, он разблокируется. В противном случае следующий вызов функции парковки гарантированно не будет заблокирован.

Первый момент объясняется здесь. Краткая версия: если у вас есть thread, который уже запущен, но еще не вызвал park, и в течение этого периода времени (между start потока и park) какой-то другой поток вызывает unpark для первого: этот поток вообще не будет парковаться. Разрешение будет доступно немедленно. Может быть, этот маленький рисунок прояснит ситуацию:

(ThreadA)  start ------------------ park --------- ....

(ThreadB)  start ----- unpark -----

Обратите внимание, как ThreadB звонит unpark(ThreadA) между периодом, когда ThreadA звонил start, и park. Таким образом, когда ThreadA достигает park: блокировка гарантированно не, как и сказано в документации.

Второй пункт из той же документации:

Эта операция не гарантирует никакого эффекта, если данный поток не был запущен.

Давайте посмотрим, что через рисунок:

Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park 

После того, как ThreadA позвонит park, он зависнет навсегда, так как ThreadB больше никогда не позвонит unpark. Обратите внимание, что вызов unpark был сделан до запуска ThreadA (в отличие от предыдущего примера).

И это именно то, что происходит в вашем случае:

LockSupport.unpark(w); (от unparkWorkers) вызывается перед t.start(); от public void start(){...}. Проще говоря, ваш код вызывает unpark на обоих workers прежде чем они даже начнутся, поэтому, когда они в конечном итоге достигают park - они застревают, никто не может их unpark. Тот факт, что вы видите это с logger, а не с System::out, скорее всего, связан с тем, что когда вы println - под капотом есть метод synchronized.


На самом деле, LockSupport предлагает именно ту семантику, которая необходима для доказательства этого. Для этого нам понадобится (для простоты: SOProblem service = new SOProblem(1);)

static class ParkBlocker {

    private volatile int x;

    public ParkBlocker(int x) {
        this.x = x;
    }

    public int getX() {
        return x;
    }
}

И теперь нам нужно вставить это в соответствующие методы. Сначала отметьте тот факт, что мы вызвали unpark:

private void unparkWorkers() {
    Arrays.stream(workers).forEach(w -> {
        LockSupport.unpark(w);
        logger.debug("Un-park call is done on " + w.getName());
    });
    /*
     * add "1" to whatever there is already in pb.x, meaning
     * we have done unparking _also_
     */
    int y = pb.x;
    y = y + 1;
    pb.x = y;
}

Затем сбросьте флаг после завершения цикла:

public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
    boolean stoppedSuccefully = false;
    stopped = true;
    unparkWorkers();
    if (stopLatch.await(timeout, unit)) {
        stoppedSuccefully = true;
        // reset the flag
        pb.x = 0;
    }
    return stoppedSuccefully;
}

Затем измените конструктор, чтобы отметить, что поток запущен:

  .....
  while (!stopped) {
       logger.debug("Parking " + Thread.currentThread().getName());
       // flag the fact that thread has started. add "2", meaning
       // thread has started
       int y = pb.x;
       y = y + 2;
       pb.x = y;
       LockSupport.park(pb);
       logger.debug(Thread.currentThread().getName() + " unparked");
  }

Затем, когда ваш поток зависает, вам нужно проверить флаг:

 public static void main(String[] args) throws InterruptedException {
    while (true) {
        SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
        service.start();
        if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
            service.debug();
            throw new RuntimeException();
        }
    }
}

где метод debug:

public void debug() {
    Arrays.stream(workers)
          .forEach(x -> {
              ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
              if (pb != null) {
                  System.out.println("x = " + pb.getX());
              }
          });
}

Когда проблема повторяется, вы вызывали unpark до вызова park, что происходит, когда x = 3 является выходом.

person Eugene    schedule 10.04.2020