Зацикливание потоков Java с использованием CyclicBarrier

У меня есть программа с этой общей структурой:

init
create CyclicBarrier
initialise all threads, attaching to barrier
*start all threads*
wait for join
display stats


*start all threads*
perform calculation
await barrier

Моя проблема в том, что мне нужен метод run() потоков, чтобы продолжать цикл до тех пор, пока не будет выполнено определенное условие, но с паузой после каждой итерации, чтобы синхронизировать все потоки.

Я уже пробовал прикрепить метод Runnable к барьеру, но это требует воссоздания и перезапуска каждого потока, что не очень хорошее решение.

Я также пытался использовать метод reset() CyclicBarrier, но это, похоже, вызывает ошибки в существующих потоках, даже если они выполняются после завершения всех потоков.

Мой вопрос:

-Можно ли «сбросить» барьер и заставить все потоки барьера следовать тем же условиям, что и до первых вызовов await()?

-Или есть другой метод, который я должен использовать для достижения этой цели?

заранее спасибо


person dylanslewis    schedule 05.05.2014    source источник
comment
Вставьте while(true) в Thread. await() барьер в конце цикла. Когда все потоки выполнены, тогда reset() барьер и все начинается снова...   -  person Boris the Spider    schedule 05.05.2014


Ответы (3)


После ответа @Totoro ниже приведен небольшой пример кода, который также включает требование «Мне нужен метод run() потоков, чтобы продолжать цикл до тех пор, пока не будет выполнено определенное условие, делая паузу после каждой итерации, чтобы все потоки синхронизировались». Это делает его довольно сложным, но, надеюсь, вывод программы прояснит код примера (или я просто должен сделать лучшие примеры).

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class BarrierCalc implements Runnable {

public static final int CALC_THREADS = 3;

private static final AtomicBoolean runCondition = new AtomicBoolean();
private static final AtomicBoolean stopRunning = new AtomicBoolean();

public static void main(String[] args) {

    CyclicBarrier barrier = new CyclicBarrier(CALC_THREADS + 1);
    for (int i = 0; i < CALC_THREADS; i++) {
         new Thread(new BarrierCalc(barrier)).start();
    }
    try {
        runCondition.set(true);
        barrier.await();
        showln(0, "STATS!");

        barrier.await();
        showln(0, "start looping 1");
        Thread.sleep(200);
        runCondition.set(false);
        showln(0, "stop looping 1");
        barrier.await();
        runCondition.set(true);

        barrier.await();
        showln(0, "start looping 2");
        Thread.sleep(100);
        runCondition.set(false);
        showln(0, "stop looping 2");
        barrier.await();

        stopRunning.set(true);
        showln(0, "finishing");
        barrier.await();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

private static final AtomicInteger calcId = new AtomicInteger();

private CyclicBarrier barrier;
private int id;

public BarrierCalc(CyclicBarrier barrier) {
    this.barrier = barrier;
    id = calcId.incrementAndGet();
}

public void run() {

    showln(id, "waiting for start");
    try {
        barrier.await(); // display stats
        barrier.await(); // start running
        int loopNumber = 0;
        while (!stopRunning.get()) {
            showln(id, "looping " + (++loopNumber));
            while (runCondition.get()) {
                Thread.sleep(10); // simulate looping
            }
            showln(id, "synchronizing " + loopNumber);
            barrier.await();
            showln(id, "synchronized " + loopNumber);
            // give main thread a chance to set stopCondition and runCondition
            barrier.await();
        }
        showln(id, "finished");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

private static final long START_TIME = System.currentTimeMillis();

public static void showln(int id, String msg) {
    System.out.println((System.currentTimeMillis() - START_TIME) + "\t ID " + id + ": " + msg);
}

}

Имейте в виду, что вывод программы может быть не в ожидаемом порядке: потоки, одновременно записывающие данные в один синхронизированный вывод (System.out), получают доступ для записи в случайном порядке.

person vanOekel    schedule 05.05.2014

Barrier.wait() приостановит потоки. Барьер уже есть в основном потоке, другого ему не нужно. В приведенном выше алгоритме вы показываете, что потоки перезапускаются после отображения статистики. Вам не нужно этого делать. Если недавно пробужденные потоки находятся в цикле, они снова вернутся в барьер. ожидание().

person Totoro    schedule 05.05.2014

Можете посмотреть на мой пример, где я играл с CyclicBarrier. Здесь каждый воркер делает какой-то расчет и на барьере проверяется условие. Если выполняется условие, то все воркеры останавливают расчеты, иначе продолжают:

class Solver {
    private static final int REQUIRED_AMOUNT = 100;
    private static final int NUMBER_OF_THREADS = 4;

    AtomicInteger atomicInteger = new AtomicInteger();
    AtomicBoolean continueCalculation = new AtomicBoolean(true);
    final CyclicBarrier barrier;

    public static void main(String[] args) {
        new Solver();
    }

    class Worker implements Runnable {
        int workerId;
        Worker(int workerId) {
            this.workerId = workerId;
        }

        public void run() {
            try {
                while(continueCalculation.get()) {
                    calculate(workerId);
                    barrier.await();
                }

            } catch (Exception ex) {
                System.out.println("Finishing " + workerId);
            }
        }
    }

    public Solver() {
        Runnable barrierAction = () -> {
            if (done()) {
                continueCalculation.set(false);
            }
        };

        barrier = new CyclicBarrier(NUMBER_OF_THREADS, barrierAction);

        List<Thread> threads = new ArrayList(NUMBER_OF_THREADS);
        for (int i = 0; i < NUMBER_OF_THREADS; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
        }
    }

    private void calculate(int workerId) throws InterruptedException {
        // Some long-running calculation
        Thread.sleep(2000L);
        int r = new Random().nextInt(12);

        System.out.println("Worker #" + workerId + " added " + r +" = " + atomicInteger.addAndGet(r));
    }

    private boolean done() {
        int currentResult = atomicInteger.get();
        boolean collected = currentResult >= REQUIRED_AMOUNT;

        System.out.println("=======================================================");
        System.out.println("Checking state at the barrier: " + currentResult);
        if (collected) {
            System.out.println("Required result is reached");
        }
        System.out.println("=======================================================");

        return collected;
    }
}
person ema    schedule 05.10.2019