Сбрасываемый обратный отсчет

Мне нужно что-то, что прямо эквивалентно CountDownLatch, но может быть сброшено (остается потокобезопасным!). Я не могу использовать классические конструкции синхронизации, так как они просто не работают в этой ситуации (сложные проблемы с блокировкой). На данный момент я создаю много CountDownLatch объектов, каждый из которых заменяет предыдущий. Я считаю, что это происходит с молодым поколением в GC (из-за огромного количества объектов). Вы можете увидеть код, который использует защелки ниже (это часть макета java.net для интерфейса сетевого симулятора ns-3).

Некоторые идеи могут заключаться в том, чтобы попробовать CyclicBarrier (JDK5 +) или Phaser (JDK7)

Я могу протестировать код и вернуться к любому, кто найдет решение этой проблемы, так как я единственный, кто может вставить его в работающую систему, чтобы увидеть, что произойдет :)

/**
 *
 */
package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

/**
 * KSelector
 * @version 1.0
 * @author Chris Dennett
 */
public class KSelector extends SelectorImpl {
    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for close and cleanup
    final class CloseLock {}
    private final Object closeLock = new CloseLock();

    private volatile boolean selecting = false;
    private volatile boolean wakeup = false;

    class SocketListener implements KKSSocketListener {
        protected volatile CountDownLatch latch = null;

        /**
         *
         */
        public SocketListener() {
            newLatch();
        }

        protected synchronized CountDownLatch newLatch() {
            return this.latch = new CountDownLatch(1);
        }

        protected synchronized void refreshReady(KKSSocket socket) {
            if (!selecting) return;

            synchronized (socketToChannel) {
                SelChImpl ch = socketToChannel.get(socket);
                if (ch == null) {
                    System.out.println("ks sendCB: channel not found for socket: " + socket);
                    return;
                }
                synchronized (channelToKey) {
                    SelectionKeyImpl sk = channelToKey.get(ch);
                    if (sk != null) {
                        if (handleSelect(sk)) {
                            latch.countDown();
                        }
                    }
                }
            }
        }
        @Override
        public void connectionSucceeded(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void connectionFailed(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void dataSent(KKSSocket socket, long bytesSent) {
            refreshReady(socket);
        }
        @Override
        public void sendCB(KKSSocket socket, long bytesAvailable) {
            refreshReady(socket);
        }
        @Override
        public void onRecv(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) {
            refreshReady(socket);
        }
        @Override
        public void normalClose(KKSSocket socket) {
            wakeup();
        }
        @Override
        public void errorClose(KKSSocket socket) {
            wakeup();
        }
    }

    protected final Map<KKSSocket, SelChImpl>        socketToChannel = new HashMap<KKSSocket, SelChImpl>();
    protected final Map<SelChImpl, SelectionKeyImpl> channelToKey    = new HashMap<SelChImpl, SelectionKeyImpl>();
    protected final SocketListener currListener = new SocketListener();
    protected Thread selectingThread = null;

    SelChImpl getChannelForSocket(KKSSocket s) {
        synchronized (socketToChannel) {
            return socketToChannel.get(s);
        }
    }

    SelectionKeyImpl getSelKeyForChannel(KKSSocket s) {
        synchronized (channelToKey) {
            return channelToKey.get(s);
        }
    }

    protected boolean markRead(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markWrite(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markAccept(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markConnect(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT);
            return selectedKeys.add(impl);
        }
    }

    /**
     * @param provider
     */
    protected KSelector(SelectorProvider provider) {
        super(provider);
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implClose()
     */
    @Override
    protected void implClose() throws IOException {
        provider().getApp().printMessage("implClose: closed: " + closed);
        synchronized (closeLock) {
            if (closed) return;
            closed = true;
            for (SelectionKey sk : keys) {
                provider().getApp().printMessage("dereg1");
                deregister((AbstractSelectionKey)sk);
                provider().getApp().printMessage("dereg2");
                SelectableChannel selch = sk.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
            implCloseInterrupt();
        }
    }

    protected void implCloseInterrupt() {
        wakeup();
    }

    private boolean handleSelect(SelectionKey k) {
        synchronized (k) {
            boolean notify = false;

            if (!k.isValid()) {
                k.cancel();
                ((SelectionKeyImpl)k).channel.socket().removeListener(currListener);
                return false;
            }

            SelectionKeyImpl ski = (SelectionKeyImpl)k;

            if ((ski.interestOps() & SelectionKeyImpl.OP_READ) != 0) {
                if (ski.channel.socket().getRxAvailable() > 0) {
                    notify |= markRead(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_WRITE) != 0) {
                if (ski.channel.socket().getTxAvailable() > 0) {
                    notify |= markWrite(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_CONNECT) != 0) {
                if (!ski.channel.socket().isConnectionless()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (!ski.channel.socket().isAccepting() && !cs.isConnecting() && !cs.isConnected()) {
                        notify |= markConnect(ski);
                    }
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_ACCEPT) != 0) {
                //provider().getApp().printMessage("accept check: ski: " + ski + ", connectionless: " + ski.channel.socket().isConnectionless() + ", listening: " + ski.channel.socket().isListening() + ", hasPendingConn: " + (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections()));
                if (!ski.channel.socket().isConnectionless() && ski.channel.socket().isListening()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (cs.hasPendingConnections()) {
                        notify |= markAccept(ski);
                    }
                }
            }
            return notify;
        }
    }

    private boolean handleSelect() {
        boolean notify = false;

        // get initial status
        for (SelectionKey k : keys) {
            notify |= handleSelect(k);
        }

        return notify;
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#doSelect(long)
     */
    @Override
    protected int doSelect(long timeout) throws IOException {
        processDeregisterQueue();

        long timestartedms = System.currentTimeMillis();

        synchronized (selectedKeys) {
            synchronized (currListener) {
                wakeup = false;
                selectingThread = Thread.currentThread();
                selecting = true;
            }
            try {
                handleSelect();

                if (!selectedKeys.isEmpty() || timeout == 0) {
                    return selectedKeys.size();
                }

                //TODO: useless op if we have keys available
                for (SelectionKey key : keys) {
                    ((SelectionKeyImpl)key).channel.socket().addListener(currListener);
                }
                try {
                    while (!wakeup && isOpen() && selectedKeys.isEmpty()) {
                        CountDownLatch latch = null;
                        synchronized (currListener) {
                            if (wakeup || !isOpen() || !selectedKeys.isEmpty()) {
                                break;
                            }
                            latch = currListener.newLatch();
                        }
                        try {
                            if (timeout > 0) {
                                long currtimems = System.currentTimeMillis();
                                long remainingMS = (timestartedms + timeout) - currtimems;

                                if (remainingMS > 0) {
                                    latch.await(remainingMS, TimeUnit.MILLISECONDS);
                                } else {
                                    break;
                                }
                            } else {
                                latch.await();
                            }
                        } catch (InterruptedException e) {

                        }
                    }
                    return selectedKeys.size();
                } finally {
                    for (SelectionKey key : keys) {
                        ((SelectionKeyImpl)key).channel.socket().removeListener(currListener);
                    }
                }
            } finally {
                synchronized (currListener) {
                    selecting = false;
                    selectingThread = null;
                    wakeup = false;
                }
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed) throw new ClosedSelectorException();
            synchronized (channelToKey) {
                synchronized (socketToChannel) {
                    keys.add(ski);
                    socketToChannel.put(ski.channel.socket(), ski.channel);
                    channelToKey.put(ski.channel, ski);
                }
            }
        }

    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        synchronized (channelToKey) {
            synchronized (socketToChannel) {
                keys.remove(ski);
                socketToChannel.remove(ski.channel.socket());
                channelToKey.remove(ski.channel);

                SelectableChannel selch = ski.channel();

                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#wakeup()
     */
    @Override
    public Selector wakeup() {
        synchronized (currListener) {
            if (selecting) {
                wakeup = true;
                selecting = false;
                selectingThread.interrupt();
                selectingThread = null;
            }
        }
        return this;
    }
}

Ура,
Крис


person Chris Dennett    schedule 06.07.2011    source источник
comment
Этот ответ может помочь вам http://stackoverflow.com/questions/4168772/java-concurrency-countdown-latch-vs-cyclic-barrier   -  person Boris Pavlović    schedule 06.07.2011
comment
Точно сказать не могу. Я нахожусь в той же ситуации, что и ответ Ким на этот вопрос.   -  person Chris Dennett    schedule 06.07.2011


Ответы (8)


Я скопировал CountDownLatch и реализовал reset() метод, который сбрасывает внутренний класс Sync в его начальное состояние (начальный счетчик) :) Кажется, работает нормально. Больше нет ненужного создания объекта \ o / Невозможно создать подкласс, потому что sync был частным. Бу.

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 *
 * <p>A {@code CountDownLatch} is a versatile synchronization tool
 * and can be used for a number of purposes.  A
 * {@code CountDownLatch} initialized with a count of one serves as a
 * simple on/off latch, or gate: all threads invoking {@link #await await}
 * wait at the gate until it is opened by a thread invoking {@link
 * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
 * can be used to make one thread wait until <em>N</em> threads have
 * completed some action, or some action has been completed N times.
 *
 * <p>A useful property of a {@code CountDownLatch} is that it
 * doesn't require that threads calling {@code countDown} wait for
 * the count to reach zero before proceeding, it simply prevents any
 * thread from proceeding past an {@link #await await} until all
 * threads could pass.
 *
 * <p><b>Sample usage:</b> Here is a pair of classes in which a group
 * of worker threads use two countdown latches:
 * <ul>
 * <li>The first is a start signal that prevents any worker from proceeding
 * until the driver is ready for them to proceed;
 * <li>The second is a completion signal that allows the driver to wait
 * until all workers have completed.
 * </ul>
 *
 * <pre>
 * class Driver { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch startSignal = new CountDownLatch(1);
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       new Thread(new Worker(startSignal, doneSignal)).start();
 *
 *     doSomethingElse();            // don't let run yet
 *     startSignal.countDown();      // let all threads proceed
 *     doSomethingElse();
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class Worker implements Runnable {
 *   private final CountDownLatch startSignal;
 *   private final CountDownLatch doneSignal;
 *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
 *      this.startSignal = startSignal;
 *      this.doneSignal = doneSignal;
 *   }
 *   public void run() {
 *      try {
 *        startSignal.await();
 *        doWork();
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Another typical usage would be to divide a problem into N parts,
 * describe each part with a Runnable that executes that portion and
 * counts down on the latch, and queue all the Runnables to an
 * Executor.  When all sub-parts are complete, the coordinating thread
 * will be able to pass through await. (When threads must repeatedly
 * count down in this way, instead use a {@link CyclicBarrier}.)
 *
 * <pre>
 * class Driver2 { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *     Executor e = ...
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       e.execute(new WorkerRunnable(doneSignal, i));
 *
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class WorkerRunnable implements Runnable {
 *   private final CountDownLatch doneSignal;
 *   private final int i;
 *   WorkerRunnable(CountDownLatch doneSignal, int i) {
 *      this.doneSignal = doneSignal;
 *      this.i = i;
 *   }
 *   public void run() {
 *      try {
 *        doWork(i);
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Memory consistency effects: Actions in a thread prior to calling
 * {@code countDown()}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions following a successful return from a corresponding
 * {@code await()} in another thread.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class ResettableCountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        public final int startCount;

        Sync(int count) {
            this.startCount = count;
            setState(startCount);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

        public void reset() {
             setState(startCount);
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public ResettableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void reset() {
        sync.reset();
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.
     *
     * <p>If the current count is zero then this method returns immediately
     * with the value {@code true}.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of three things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>The specified waiting time elapses.
     * </ul>
     *
     * <p>If the count reaches zero then the method returns with the
     * value {@code true}.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.  If the time is less than or equal to zero, the method
     * will not wait at all.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the {@code timeout} argument
     * @return {@code true} if the count reached zero and {@code false}
     *         if the waiting time elapsed before the count reached zero
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }

    /**
     * Returns a string identifying this latch, as well as its state.
     * The state, in brackets, includes the String {@code "Count ="}
     * followed by the current count.
     *
     * @return a string identifying this latch, as well as its state
     */
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}
person Chris Dennett    schedule 06.07.2011

Основываясь на ответе @Fidel -s, я произвел замену ResettableCountDownLatch. Изменения, которые я сделал

  • mLatch is private volatile
  • mInitialCount is private final
  • тип возврата простого await() изменен на void.

В остальном исходный код тоже крутой. Итак, это полный расширенный код:

public class ResettableCountDownLatch {

    private final int initialCount;
    private volatile CountDownLatch latch;

    public ResettableCountDownLatch(int  count) {
        initialCount = count;
        latch = new CountDownLatch(count);
    }

    public void reset() {
        latch = new CountDownLatch(initialCount);
    }

    public void countDown() {
        latch.countDown();
    }

    public void await() throws InterruptedException {
        latch.await();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }
}

Обновлять

На основании комментария @Systemplanet -s, вот более безопасная версия reset():

    // An atomic reference is required because reset() is not that atomic anymore, not even with `volatile`.
    private final AtomicReference<CountDownLatch> latchHolder = new AtomicReference<>();

    public void reset() {
        // obtaining a local reference for modifying the required latch
        final CountDownLatch oldLatch = latchHolder.getAndSet(null);
        if (oldLatch != null) {
            // checking the count each time to prevent unnecessary countdowns due to parallel countdowns
            while (0L < oldLatch.getCount()) {
                oldLatch.countDown();
            }
        }
    }

По сути, это выбор между простотой и безопасностью. Т.е. если вы готовы переложить ответственность за свой код на клиента, то достаточно установить ссылку null в reset().

С другой стороны, если вы хотите упростить пользователям этот код, вам нужно использовать еще немного уловок.

person Tamas Rev    schedule 27.10.2016
comment
любой поток, ожидающий более старого экземпляра защелки, зависнет. Вы можете изменить на ‹pre› public void reset () {if (latch! = Null) для (int i = 0; i ‹initialCount; i ++) latch.countDown (); защелка = новый CountDownLatch (initialCount); } ‹/Pre› - person Systemsplanet; 29.07.2017
comment
@Systemsplanet спасибо за комментарий! Я добавил более безопасную версию reset() с множеством других улучшений. - person Tamas Rev; 30.07.2017
comment
@TamasRev, о: latchHolder.getAndSet (null); Разве это не должна быть новая защелка вместо нуля? - person Michał Szkudlarek; 09.05.2018
comment
@ MichałSzkudlarek хм, наверное, final CountDownLatch oldLatch = latchHolder.getAndSet(new CountDownLatch(initialCount)); было бы лучше. Мне нужно подумать обо всех возможностях, чтобы быть уверенным. - person Tamas Rev; 09.05.2018

Я не уверен, что это фатально ошибочно, но недавно у меня была такая же проблема, и я решил ее, просто создавая экземпляр нового объекта CountDownLatch каждый раз, когда я хотел выполнить сброс. Что-то вроде этого:

Официант:

bla();
latch.await();
//now the latch has counted down to 0
blabla();

CountDowner

foo();
latch.countDown();
//now the latch has counted down to 0
latch = new CountDownLatch(1);
Waiter.receiveReferenceToNewLatch(latch);
bar();

Очевидно, это тяжелая абстракция, но до сих пор она у меня работала и не требует от вас возиться с определениями классов.

person Uzebeckatrente    schedule 13.06.2018
comment
Это может пострадать из-за проблем с безопасностью памяти в многопоточных средах (разные ядра процессора могут видеть разные CountDownLatch экземпляры в любой момент времени). Вы должны заключить CountDownLatch ссылку в AtomicReference. - person Luke Hutchison; 19.02.2019

Phaser имеет больше возможностей, мы может реализовать сбрасываемый countdownLatch, используя это.

Прочтите ниже основные концепции со следующих сайтов

https://examples.javacodegeeks.com/core-java/util/concurrent/phaser/java-util-concurrent-phaser-example/

http://netjs.blogspot.in/2016/01/phaser-in-java-concurrency.html

import java.util.concurrent.Phaser;
/**
 * Resettable countdownLatch using phaser
 */
public class PhaserExample {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(3); // you can use constructor hint or
                                        // register() or mixture of both
        // register self... so parties are incremented to 4 (3+1) now
        phaser.register();
        //register is one time call for all the phases.
        //means no need to register for every phase             


        int phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);

        // similar to await() in countDownLatch/CyclicBarrier
        // parties are decremented to 3 (4+1) now
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

        //second phase
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

    }

    private void testPhaser(final Phaser phaser, final int sleepTime) {
        // phaser.register(); //Already constructor hint is given so not
        // required
        new Thread() {
            @Override
            public void run() {
                try {
                    Thread.sleep(sleepTime);
                    System.out.println(Thread.currentThread().getName() + " arrived");
                    // phaser.arrive(); //similar to CountDownLatch#countDown()
                    phaser.arriveAndAwaitAdvance();// thread will wait till Barrier opens
                    // arriveAndAwaitAdvance is similar to CyclicBarrier#await()
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " after passing barrier");
            }
        }.start();
    }
}
person Kanagavelu Sugumar    schedule 28.04.2017

Еще одна простая замена

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ResettableCountDownLatch {
    int mInitialCount;
    CountDownLatch mLatch;

    public ResettableCountDownLatch(int  count) {
        mInitialCount = count;
        mLatch = new CountDownLatch(count);
    }

    public void reset() {
        mLatch = new CountDownLatch(mInitialCount);
    }

    public void countDown() {
        mLatch.countDown();
    }

    public boolean await() throws InterruptedException {
        boolean result = mLatch.await();
        return result;
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        boolean result = mLatch.await(timeout, unit);
        return result;
    }
}
person Fidel    schedule 12.08.2015
comment
Это не является потокобезопасным, потому что в модели памяти Java переназначение mLatch может не быть мгновенно видимым для других потоков, если оно не отмечено как изменчивое. - person bcoughlan; 19.08.2015
comment
+1 к комментарию @bcoughlan. Это может пострадать из-за проблем с безопасностью памяти в многопоточных средах (разные ядра процессора могут видеть разные CountDownLatch экземпляры в любой момент времени). Вы должны заключить CountDownLatch ссылку в AtomicReference. - person Luke Hutchison; 19.02.2019

Похоже, вы хотите превратить асинхронный ввод-вывод в синхронный. Вся идея использования асинхронного ввода-вывода состоит в том, чтобы избежать потоков, но CountDownLatch требует использования потоков. Это очевидное противоречие в вашем вопросе. Так что вы можете:

  • продолжайте использовать потоки и используйте синхронный ввод-вывод вместо селекторов и суффиксов. Это будет намного проще и надежнее
  • продолжайте использовать асинхронный ввод / вывод и откажитесь от CountDownLatch. Тогда вам понадобится асинхронная библиотека - посмотрите RxJava, Akka или df4j.
  • продолжайте развивать свой проект в свое удовольствие. Затем вы можете попробовать использовать java.util.Semaphore вместо CountDownLatch или запрограммировать свой собственный класс синхронизации с помощью synchronized / wait / notify.
person Alexei Kaigorodov    schedule 16.08.2018

public class ResettableLatch {
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return getState() == 0 ? 1 : -1;
    }

    public void reset(int count) {
        setState(count);
    }

    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public ResettableLatch(int count) {
    if (count < 0)
        throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
    sync.releaseShared(1);
}

public long getCount() {
    return sync.getCount();
}

public void reset(int count) {
    sync.reset(count);
}
}

Это сработало для меня.

person jaidevchattre    schedule 16.11.2019
comment
просто добавил метод сброса защелки, а остальное такое же, как CountDownLatch - person jaidevchattre; 16.11.2019

Из того, что я смог понять из объяснения OP и исходного кода, сбрасываемый CountDownLatch не совсем адекватная концепция для проблемы, которую он собирался решить. Документация самой CountDownLatch упоминает вариант использования OP как простые ворота, инициализируемые счетчиком до единицы:

CountDownLatch, инициализированный счетчиком, равным единице, служит простой защелкой включения / выключения или вентилем: все потоки, вызывающие await, ждут у шлюза, пока он не будет открыт потоком, вызывающим countDown.

, но реализация CountDownLatch не идет дальше в этом направлении.

Итак, у меня возникла проблема, аналогичная проблеме OP, я решил ввести класс SimpleGate со следующими свойствами:

  • Количество разрешений - одно, что означает, что оно может находиться в состоянии On или Off;

  • Существует специальный поток, называемый Gate Keeper, который разрешен только shut off или open up воротам;

  • Право на охрану ворот передается;

  • открытие ворот немедленно позволяет потокам, которые пытались come through сделать это (эта очень логичная особенность была упущена из виду в других ответах);

  • поскольку ожидается высокая конкуренция потоков, справедливость поддерживается в качестве опции, это позволяет уменьшить эффект перебоев потоков.

      public class SimpleGate {
    
          private static class Sync extends AbstractQueuedSynchronizer {
    
             // State
             private static final int SHUT = 1;
             private static final int OPEN = 0;
    
             private boolean fair;
    
             public void setFair(boolean fair) {
                this.fair = fair;
             }
    
             public void shutOff() {
                super.setState(SHUT);
             }
    
             @Override
             protected int tryAcquireShared(int arg) {
                if (fair && super.hasQueuedPredecessors())
                   return -1;
                return super.getState() == OPEN ? 1 : -1;
             }
    
             @Override
             protected boolean tryReleaseShared(int arg) {
                super.setState(OPEN); 
                return true;
             }
    
        }
    
        private Sync sync = new Sync();
        private volatile Thread gateKeeper = Thread.currentThread();
    
        public SimpleGate(){
           this(true);
        }
    
        public SimpleGate(boolean shutOff){
           this(shutOff, false);
        }
    
        public SimpleGate(boolean shutOff, boolean fair){
           if (shutOff)
              sync.shutOff();
           sync.setFair(fair);
        }
    
        public void comeThrough(){
           if (Thread.currentThread() == gateKeeper)
              throw new IllegalStateException("Gate Keeper thread is not supposed to come through the gate");
           sync.acquireShared(0);
        }
    
        public void shutOff(){
           if (Thread.currentThread() != gateKeeper)
              throw new IllegalStateException("Only a Gate Keeper thread is allowed to shut off");
            sync.shutOff();
        }
    
        public void openUp(){
           if (Thread.currentThread() != gateKeeper)
              throw new IllegalStateException("Only a Gate Keeper thread is allowed to open up");
           sync.releaseShared(0);
        }
    
        public void transferOwnership(Thread newGateKeeper){
           this.gateKeeper  = newGateKeeper;
        }
    
        // an addition of waiting interruptibly and waiting for specified amount of time, 
        //if they are needed, is trivial 
    }
    
person igor.zh    schedule 29.06.2020