Серийный доступ к коллекции с использованием ConcurrentLinkedQueue‹T›

У меня есть следующий вопрос, касающийся Java 7 ConcurrentLinkedQueue. Предположим, что у меня есть следующий класс:

public class Blah {
    private ConcurrentLinkedQueue<String> queue;

    public Blah() {
        queue = new ConcurrentLinkedQueue<String>();
    }

    public void produce(String action, String task) throws InterruptedException {
        synchronized(queue) {
            while(queue.size() >= 8) 
                queue.wait();
            queue.add(action + "#" + task);
            queue.notifyAll();
        }
    }

    public void consume() throws InterruptedException {
        synchronized(queue) {
            while(queue.size() <= 0)
                queue.wait();
            String element = queue.poll();
            StringTokenizer strTok = new StringTokenizer(element, "#");
            String action = strTok.nextToken();
            String task = strTok.nextToken();
            /**
             * Operate on request
             */
            queue.notifyAll();
        }
    }
}

Функции производства и потребления будут вызываться параллельными потоками для создания/удаления потоков для/из списка. Я реализую предыдущие функции потребления() и производства(), чтобы сериализовать добавление/удаление элементов в моей очереди. Требуется ли вышеперечисленное, или ConcurrentLinkedQueue позаботится об этом? Я спрашиваю, потому что не хочу снижать производительность своего кода.

Спасибо, Ник.


person nick.katsip    schedule 12.04.2015    source источник
comment
Зачем ты синхронизируешься??   -  person Boris the Spider    schedule 13.04.2015
comment
@Boris Потому что я хочу, чтобы все операции внутри критической секции выполнялись атомарно.   -  person nick.katsip    schedule 13.04.2015
comment
Это ConcurrentLinkedQueue. Он гарантирует атомарность и видимость при правильном использовании. Вы переводите неблокирующий Queue в BlockingQueue. Не.   -  person Boris the Spider    schedule 13.04.2015


Ответы (3)


TL;DR: вы используете Queue, специально разработанный для неблокировки, как BlockingQueue.

Ваш код можно переписать как:

public class Blah {
    private BlockingQueue<String> queue;

    public Blah() {
        queue = new LinkedBlockingQueue<>(8);
    }

    public void produce(String action, String task) throws InterruptedException {
        while (true) {
            queue.put(action + "#" + task);
        }
    }

    public void consume() throws InterruptedException {
        while (true) {
            final String[] data = queue.take().split("#");
            final String action = data[0];
            final String task = data[1];
        }
    }
}

BlockingQueue ограничено 8 элементами. put заблокируется, если очередь заполнена. take заблокируется, если очередь пуста.

Синхронизация не требуется.

Кроме того, StringTokenizer устарел. Я бы посоветовал вам использовать class что-то вроде:

public class Data {
    private final String action;
    private final String task;

    public Data(final String action, final String task) {
        this.action = action;
        this.task = task;
    }

    public String getAction() {
        return action;
    }

    public String getTask() {
        return task;
    }
}

Для обмена данными. Нет причин создавать и анализировать Strings.

person Boris the Spider    schedule 12.04.2015

Вы действительно снизили производительность своего кода, поскольку используете встроенную синхронизацию, которая является «самой медленной» из механизма синхронизации.

У вас есть идеальный вариант использования BlockingQueue. Что предоставляет вам операции put и take, которые блокируются до тех пор, пока пространство/элемент не станет доступным.

ConcurrentLinkedQueue предлагает вам только безопасность потоков, а не синхронизацию. Это означает, что вы можете безопасно добавлять/удалять элементы из очереди в многопоточном приложении, но не предоставляет вам механизма для ожидания пробела/элементов, поэтому вы правильно использовали методы wait(), notify() для этой цели (хотя вы могли бы синхронизируется и с любым другим общим объектом, это не обязательно должна быть очередь). Использование Lock вместо синхронизированного намного быстрее в Java.

person Zielu    schedule 12.04.2015
comment
Основная причина, по которой я хотел иметь синхронизированный блок, заключается в том, что я хочу, чтобы часть кода, работающая по запросу, выполнялась атомарным образом. - person nick.katsip; 13.04.2015
comment
Я не знаю, что вы подразумеваете под запросом части вашего кода, я не вижу такой вещи. BlockingQueue add() реализует именно эту часть кода: while(queue.size() ›= 8) queue.wait(); очередь.добавить(действие + # + задача); очередь.notifyAll(); но более эффективным способом - person Zielu; 13.04.2015

Предполагая, что produce и consume будут вызываться независимо, вы, вероятно, получите бесконечный цикл, поскольку оба пытаются заблокировать объект очереди, и, таким образом, как только consume не найдет никаких элементов, он никогда не снимет блокировку, предотвращающую добавление элементов. . Логика синхронизации, вероятно, должна быть реализована в методах, вызывающих consume и produce.

person kpentchev    schedule 12.04.2015
comment
они освобождают блокировку в операции ожидания(). - person Zielu; 13.04.2015