Где ошибка в очереди блокировки?

Я написал реализацию очереди без блокировки Java. У него есть ошибка параллелизма. Я не могу найти это. Этот код не важен. Я просто беспокоюсь о том, что не могу объяснить наблюдаемое поведение, связанное с изменчивыми переменными.

Баг виден по исключению ("null head"). Это невозможное состояние, потому что существует атомарное целое число, содержащее текущий размер очереди. Очередь имеет элемент-заглушку. Он предусматривает, что поток чтения не изменяет хвостовой указатель, а поток записи не изменяет указатель начала.

Переменная длины очереди гарантирует, что связанный список никогда не будет пустым. Это как семафор.

Метод take ведет себя так, как будто получает украденное значение длины.

class Node<T> {
    final AtomicReference<Node<T>> next = new AtomicReference<Node<T>>();
    final T ref;
    Node(T ref) {
        this.ref = ref;
    }
}
public class LockFreeQueue<T> {
    private final AtomicInteger length = new AtomicInteger(1);
    private final Node stub = new Node(null);
    private final AtomicReference<Node<T>> head = new AtomicReference<Node<T>>(stub);
    private final AtomicReference<Node<T>> tail = new AtomicReference<Node<T>>(stub);

    public void add(T x) {
        addNode(new Node<T>(x));
        length.incrementAndGet();
    }

    public T takeOrNull() {
        while (true) {
            int l = length.get();
            if (l == 1) {
                return null;
            }
            if (length.compareAndSet(l, l - 1)) {
                break;
            }
        }
        while (true) {
            Node<T> r = head.get();
            if (r == null) {
                throw new IllegalStateException("null head");
            }
            if (head.compareAndSet(r, r.next.get())) {
                if (r == stub) {
                    stub.next.set(null);
                    addNode(stub);
                } else {
                    return r.ref;
                }
            }
        }
    }

    private void addNode(Node<T> n) {
        Node<T> t;
        while (true) {
            t = tail.get();
            if (tail.compareAndSet(t, n)) {
                break;    
            }
        }
        if (t.next.compareAndSet(null, n)) {
            return;
        }
        throw new IllegalStateException("bad tail next");
    }
}

person Daneel Yaitskov    schedule 20.12.2013    source источник
comment
Как этот код предотвращает гонки данных, когда механизм блокировки не используется? Почему вы не хотите использовать замок?   -  person Chriss    schedule 20.12.2013
comment
Когда вы видите проблему? Вы получаете это с помощью одного потока чтения или вам нужно несколько читателей, прежде чем вы увидите проблему? Я подозреваю, что проблема связана с несколькими потоками чтения, находящимися во втором цикле while для takeOrNull.   -  person matt helliwell    schedule 20.12.2013
comment
Это не производственный код. Отнеситесь к этому как к упражнению.   -  person Daneel Yaitskov    schedule 21.12.2013
comment
Я тестирую эту очередь 100 читателей и 100 писателей.   -  person Daneel Yaitskov    schedule 21.12.2013


Ответы (1)


Я думаю, что есть ошибка в том, как вы используете счетчик в takeOrNull(), когда вы удаляете заглушку, вы уменьшаете длину на 1, но не увеличиваете ее повторно при добавлении заглушки обратно в конце, так как вы используете addNode () вместо добавления(). Допустим, вы успешно добавили элемент, поэтому ваша очередь выглядит так:

Length is 2
STUB -> FIRST_NODE -> NULL
 ^          ^
 |          |
Head       Tail

Итак, теперь один поток начинает выполнять takeOrNull(), длина уменьшается до 1, Head перемещается в FIRST_NODE, и, поскольку это узел STUB, он снова добавляется в конец, так что теперь у вас есть:

Length is 1
FIRST_NODE -> STUB -> NULL
 ^             ^
 |             |
Head          Tail

Ты видишь? Длина теперь 1! При следующем вызове takeOrNull() вы получите NULL, даже если FIRST_NODE все еще находится в очереди и никогда не возвращался... Вы только что (временно) потеряли часть данных. Кроме того, теперь вы можете повторять это до бесконечности и начинать накапливать узлы. Например, если вы добавите три узла, длина равна 4, и у вас есть FIRST, STUB, NEW1, NEW2, NEW3. Если вы затем сделаете три takeOrNull(), вы получите NEW2, NEW3, STUB и Length 1. Таким образом, вы в конечном итоге потеряете элементы, но я признаю, что не совсем уверен в том, как это когда-либо вызовет исключение. Позвольте мне поесть и подумать об этом еще немного. ;-)

РЕДАКТИРОВАТЬ: Хорошо, еда пошла мне на пользу, я придумал последовательность, которая вызывает исключение head null. Давайте начнем с действительной очереди с одним элементом, как раньше:

Length is 2
STUB -> FIRST_NODE -> NULL
 ^          ^
 |          |
Head       Tail

Теперь у нас есть четыре потока, два из которых пытаются одновременно использовать функцию takeOrNull() и две функции add(). Оба потока добавления правильно переместили указатель хвоста, первый переместил хвост с FIRST на SECOND, а затем был приостановлен. Второй поток добавления переместил затем хвост из SECOND в THIRD, затем обновил указатель next старого хвоста (SECOND), а затем увеличил счетчик и вышел. У нас осталось:

Length is 3
STUB -> FIRST_NODE -> NULL          SECOND_NODE ->  THIRD_NODE -> NULL
 ^                                                     ^
 |                                                     |
Head                                                  Tail

Теперь два потока takeOrNull пробуждаются и выполняются, поскольку Length равен 3, оба смогут получить элемент! Первый перемещает Head из STUB в FIRST, второй перемещает Head из FIRST в NULL. И теперь HEAD равен нулю, и всякий раз, когда вызывается takeOrNull(), ИСКЛЮЧЕНИЕ!

person llongi    schedule 20.12.2013