Том Ван Эйк, инженер по разработке программного обеспечения, Workday
Эта статья, изначально опубликованная в нашем блоге разработчиков GitHub, посвящена менее известной конструкции синхронизации: переменной условия.
Когда дело доходит до статей о параллелизме, много цифровых чернил было пролито на тему мьютексов. Хотя знакомство программиста с мьютексами, вероятно, зависит от того, какие программы он обычно пишет, большинство разработчиков, как правило, хотя бы в некоторой степени знакомы с этими конкретными примитивами синхронизации. Однако в этой статье мы сосредоточимся на гораздо менее известной конструкции синхронизации: переменной условия.
Переменные условия используются для перевода потоков в спящий режим и их повторного пробуждения после выполнения определенного условия. Не волнуйтесь, если это звучит немного расплывчато; мы поговорим об этом подробнее позже. Поскольку условные переменные всегда необходимо использовать вместе с мьютексами, мы сделаем небольшое резюме по мьютексам. Далее мы расскажем о проблемах потребителя и производителя и о том, как элегантно их решить с помощью условных переменных. Затем мы рассмотрим, как использовать эти примитивы синхронизации для реализации вызовов методов блокировки. В заключение мы опишем любопытное поведение переменных условий и способы защиты от них.
Резюме мьютекса
Мьютекс - это структура данных для защиты общего состояния между несколькими потоками. Когда фрагмент кода заключен внутри мьютекса, мьютекс гарантирует, что только один поток одновременно может выполнить этот код. Если другой поток хочет начать выполнение этого кода, ему придется подождать, пока наш первый поток не закончит с ним. Я понимаю, что все это может показаться немного абстрактным, поэтому сейчас, вероятно, хорошее время, чтобы привести пример кода.
Запись в общее состояние
В этом первом примере мы посмотрим, что происходит, когда два потока пытаются изменить одну и ту же общую переменную. В приведенном ниже фрагменте показаны два метода: counters_with_mutex
и counters_without_mutex
. Оба метода начинаются с создания массива counters
с нулевой инициализацией перед созданием 5 потоков. Каждый поток выполнит 100 000 циклов, при этом каждая итерация будет увеличивать все элементы массива counters
на единицу. Оба метода одинаковы во всех отношениях, за исключением одного: только один из них использует мьютекс.
def counters_with_mutex mutex = Mutex.new counters = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
5.times.map do Thread.new do 100000.times do mutex.synchronize do counters.map! { |counter| counter + 1 } end end end end.each(&:join)
counters.inspect end
def counters_without_mutex counters = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
5.times.map do Thread.new do 100000.times do counters.map! { |counter| counter + 1 } end end end.each(&:join)
counters.inspect end
puts counters_with_mutex # => [500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000]
puts counters_without_mutex # => [500000, 447205, 500000, 500000, 500000, 500000, 203656, 500000, 500000, 500000] # note that we seem to have lost some increments here due to not using a mutex
Как видите, только метод, использующий мьютекс, дает правильный результат. Кажется, что метод без мьютекса потерял некоторые приращения. Это связано с тем, что отсутствие мьютекса позволяет нашему второму потоку прервать наш первый поток в любой момент во время его выполнения. Это может привести к серьезным проблемам.
Например, представьте, что наш первый поток только что прочитал первую запись массива counters
, увеличил ее на единицу и теперь готовится записать это увеличенное значение обратно в наш массив. Однако, прежде чем наш первый поток сможет записать это увеличенное значение, он прерывается вторым потоком. Затем этот второй поток переходит к чтению текущего значения первой записи, увеличивает его на единицу и успешно записывает результат обратно в наш массив counters
. Теперь у нас проблема!
У нас есть проблема, потому что первый поток был прерван прежде, чем у него была возможность записать увеличенное значение в массив. Когда первый поток возобновляется, он в конечном итоге перезаписывает значение, которое второй поток только что поместил в массив. Это приведет к тому, что мы по существу потеряем операцию приращения, что объясняет, почему в выводе нашей программы есть записи менее 500000.
Всех этих проблем можно избежать, используя мьютекс. Помните, что поток, выполняющий код, заключенный в мьютекс, не может чередоваться с другим потоком, желающим выполнить этот же код. Следовательно, наш второй поток никогда не перемежался бы с первым потоком, тем самым избегая возможности перезаписи результатов.
Чтение из общего состояния
Существует распространенное заблуждение, что мьютекс требуется только при записи в общую переменную, а не при чтении из нее. В приведенном ниже фрагменте показано, как 50 потоков снова и снова переворачивают логические значения в массиве flags
. Многие разработчики считают, что этот фрагмент не содержит ошибок, поскольку код, отвечающий за изменение этих значений, был заключен в мьютекс. Если бы это было так, то каждая строка вывода puts flags.to_s
должна состоять из 10 повторений либо true
, либо false
. Как мы увидим ниже, это не так.
mutex = Mutex.new flags = [false, false, false, false, false, false, false, false, false, false]
threads = 50.times.map do Thread.new do 100000.times do # don't do this! Reading from shared state requires a mutex! puts flags.to_s
mutex.synchronize do flags.map! { |f| !f } end end end end threads.each(&:join)
$ ruby flags.rb > output.log $ grep 'true, false' output.log | wc -l 30
Здесь происходит то, что наш мьютекс гарантирует только то, что никакие два потока не могут изменять массив flags
одновременно. Однако вполне возможно, что один поток начнет чтение из этого массива, в то время как другой поток занят его изменением, тем самым заставляя первый поток читать массив, содержащий как true
, так и false
записи. К счастью, всего этого можно легко избежать, заключив puts flags.to_s
в наш мьютекс. Это гарантирует, что только один поток одновременно может читать или записывать в массив flags
.
Прежде чем двигаться дальше, я просто хотел бы упомянуть, что даже очень опытные люди сбивались с толку из-за того, что не использовали мьютекс при доступе к общему состоянию. Фактически, в какой-то момент даже существовал шаблон проектирования Java, который предполагал, что не всегда использовать для этого мьютекс безопасно. Излишне говорить, что с тех пор в эту схему были внесены поправки.
Проблемы потребителя-производителя
Освободившись от этого обновления мьютексов, мы можем приступить к изучению условных переменных. Переменные состояния лучше всего объяснять, пытаясь найти практическое решение проблемы потребителя-производителя. Фактически, проблемы потребителя и производителя настолько распространены, что в Ruby уже есть структура данных, предназначенная для их решения: класс Queue. Этот класс использует условную переменную для реализации блокирующего варианта своего метода shift()
. В этой статье мы приняли сознательное решение не использовать класс Queue
. Вместо этого мы собираемся писать все с нуля с помощью условных переменных.
Давайте посмотрим на проблему, которую мы собираемся решить. Представьте, что у нас есть веб-сайт, на котором пользователи могут создавать задачи различной сложности, например сервис, который позволяет пользователям конвертировать загруженные изображения jpg в pdf. Мы можем думать об этих пользователях как о производителях постоянного потока задач произвольной сложности. Эти задачи будут храниться на внутреннем сервере, на котором запущено несколько рабочих процессов. Каждый рабочий процесс захватывает задачу, обрабатывает ее, а затем берет следующую. Эти работники - потребители нашей задачи.
С тем, что мы знаем о мьютексах, не должно быть слишком сложно написать фрагмент кода, имитирующий описанный выше сценарий. Скорее всего, это будет выглядеть примерно так.
tasks = [] mutex = Mutex.new threads = []
class Task def initialize @duration = rand() end
def execute sleep @duration end end
# producer threads threads += 2.times.map do Thread.new do while true mutex.synchronize do tasks << Task.new puts "Added task: #{tasks.last.inspect}" end # limit task production speed sleep 0.5 end end end
# consumer threads threads += 5.times.map do Thread.new do while true task = nil mutex.synchronize do if tasks.count > 0 task = tasks.shift puts "Removed task: #{task.inspect}" end end # execute task outside of mutex so we don't unnecessarily # block other consumer threads task.execute unless task.nil? end end end
threads.each(&:join)
Приведенный выше код должен быть довольно простым. Существует класс Task
для создания задач, выполнение которых занимает от 0 до 1 секунды. У нас есть 2 потока-производителя, каждый из которых выполняет бесконечный цикл while
, который безопасно добавляет новую задачу в массив tasks
каждые 0,5 секунды с помощью мьютекса. Наши 5 потребительских потоков также выполняют бесконечный while
цикл, каждая итерация захватывает мьютекс, чтобы безопасно проверить tasks
массив на наличие доступных задач. Если потребительский поток находит доступную задачу, он удаляет ее из массива и начинает ее обработку. После обработки задачи поток переходит к своей следующей итерации, тем самым повторяя цикл заново.
Хотя приведенная выше реализация, кажется, работает нормально, она не оптимальна, поскольку требует, чтобы все потребительские потоки постоянно опрашивали массив tasks
на предмет доступной работы. Этот опрос не является бесплатным. Интерпретатор Ruby должен постоянно планировать запуск потоков-потребителей, тем самым вытесняя потоки, которые могут выполнять действительно важную работу. В качестве примера приведенный выше код будет чередовать потоки потребителей, которые выполняют задачу, с потоками потребителей, которые просто хотят проверить наличие новых задач. Это может стать настоящей проблемой, когда существует большое количество потоков-потребителей и всего несколько задач.
Если вы хотите сами убедиться, насколько неэффективен этот подход, вам нужно только изменить исходный код для потребительских потоков с помощью кода, показанного ниже. Эта модифицированная программа печатает более тысячи строк This thread has nothing to do
для каждой отдельной строки Removed task
. Надеюсь, это даст вам представление об общей расточительности, когда потребительские потоки постоянно опрашивают массив tasks
.
# modified consumer threads code
threads += 5.times.map do
Thread.new do
while true
task = nil
mutex.synchronize do
if tasks.count > 0
task = tasks.shift
puts "Removed task: #{task.inspect}"
else
puts 'This thread has nothing to do'
end
end
# execute task outside of mutex so we don't unnecessarily
# block other consumer threads
task.execute unless task.nil?
end
end
end
Переменные условий на помощь
Итак, как мы можем создать более эффективное решение проблемы потребителя и производителя? Вот где в игру вступают условные переменные. Переменные условия используются для перевода потоков в спящий режим и их пробуждения только после выполнения определенного условия. Помните, что наше текущее решение проблемы производитель-потребитель далеко от идеала, потому что потокам-потребителям необходимо постоянно опрашивать появление новых задач. Все было бы намного эффективнее, если бы наши потребительские потоки могли переходить в спящий режим и просыпаться только при поступлении новой задачи.
Ниже показано решение проблемы потребителя-производителя, в котором используются условные переменные. Мы поговорим о том, как это работает, через секунду. А пока просто взгляните на код и, возможно, попробуйте его запустить. Если бы вы его запустили, вы, вероятно, увидели бы, что This thread has nothing to do
больше не отображается. Наш новый подход полностью избавился от потребительских потоков, занятых опросом массива tasks
.
Использование переменной условия теперь заставит наши потребительские потоки ждать, пока задача будет доступна в массиве tasks
, прежде чем продолжить. В результате теперь мы можем удалить некоторые проверки, которые должны были быть в нашем исходном потребительском коде. Я добавил несколько комментариев к приведенному ниже коду, чтобы выделить эти удаления.
tasks = [] mutex = Mutex.new cond_var = ConditionVariable.new threads = []
class Task def initialize @duration = rand() end
def execute sleep @duration end end
# producer threads threads += 2.times.map do Thread.new do while true mutex.synchronize do tasks << Task.new cond_var.signal puts "Added task: #{tasks.last.inspect}" end # limit task production speed sleep 0.5 end end end
# consumer threads threads += 5.times.map do Thread.new do while true task = nil mutex.synchronize do while tasks.empty? cond_var.wait(mutex) end
# the `if tasks.count == 0` statement will never be true as the thread # will now only reach this line if the tasks array is not empty puts 'This thread has nothing to do' if tasks.count == 0
# similarly, we can now remove the `if tasks.count > 0` check that # used to surround this code. We no longer need it as this code will # now only get executed if the tasks array is not empty. task = tasks.shift puts "Removed task: #{task.inspect}" end # Note that we have now removed `unless task.nil?` from this line as # our thread can only arrive here if there is indeed a task available. task.execute end end end
threads.each(&:join)
За исключением того, что мы удалили несколько операторов if
, наш новый код по сути идентичен нашему предыдущему решению. Единственное исключение - пять новых строк, показанных ниже. Не волнуйтесь, если некоторые из сопроводительных комментариев еще не совсем понятны. Сейчас также хорошее время, чтобы указать, что новый код как для потоков-производителей, так и для потоков-потребителей был добавлен в существующие блоки синхронизации мьютексов. Переменные условия не являются потокобезопасными и поэтому всегда должны использоваться вместе с мьютексом!
# declaring the condition variable cond_var = ConditionVariable.new
# a producer thread now signals the condition variable # after adding a new task to the tasks array cond_var.signal
# a consumer thread now goes to sleep when it sees that # the tasks array is empty. It can get woken up again # when a producer thread signals the condition variable. while tasks.empty? cond_var.wait(mutex) end
А теперь поговорим о новом коде. Начнем с фрагмента потребительских тем. На самом деле в этих трех строках происходит так много всего, что мы ограничимся рассмотрением того, что сейчас делает cond_var.wait(mutex)
. Мы объясним необходимость петли while tasks.empty?
позже. Первое, на что следует обратить внимание в методе wait
, - это параметр, который ему передается. Помните, как условная переменная не является потокобезопасной и поэтому ее методы должны вызываться только внутри блока синхронизации мьютекса? Это тот мьютекс, который необходимо передать в качестве параметра методу wait
.
Вызов wait
для переменной условия вызывает две вещи. Прежде всего, это переводит поток, который вызывает wait
, в спящий режим. То есть поток сообщит интерпретатору, что он больше не хочет быть запланированным. Однако этот поток по-прежнему владеет мьютексом, поскольку он переходит в спящий режим. Нам нужно убедиться, что поток отказывается от этого мьютекса, потому что в противном случае все другие потоки, ожидающие этого мьютекса, будут заблокированы. Передав этот мьютекс методу wait
, внутреннее устройство метода wait
гарантирует, что мьютекс будет освобожден, когда поток перейдет в спящий режим.
Перейдем к обсуждениям производителей. Эти потоки сейчас вызывают cond_var.signal
. Метод signal
довольно прост, поскольку он пробуждает ровно один из потоков, которые были переведены в спящий режим методом wait
. Этот только что пробужденный поток укажет интерпретатору, что он готов снова начать планирование, а затем будет ждать своей очереди.
Итак, какой код начинает выполняться наш только что пробудившийся поток после того, как он снова будет запланирован? Он начинает выполнение с того места, где остановился. По сути, только что пробудившийся поток вернется из своего вызова в cond_var.wait(mutex)
и возобновит работу оттуда. Лично мне нравится думать о вызове wait
как о создании точки сохранения внутри потока, из которой можно возобновить работу после того, как поток будет разбужен и снова перепланирован. Обратите внимание, что, поскольку поток хочет возобновить работу с того места, где он был первоначально остановлен, ему необходимо повторно получить мьютекс, чтобы его можно было запланировать. Это повторное получение мьютекса очень важно, поэтому обязательно запомните его.
Это хорошо объясняет, почему нам нужно использовать while tasks.empty?
при вызове wait
в потоке потребителя. Когда наш только что пробужденный поток возобновляет выполнение, возвращаясь из cond_var.wait
, первое, что он делает, это завершает свою ранее прерванную итерацию через цикл while
, тем самым снова оценивая while tasks.empty?
. Это фактически заставляет нас аккуратно избегать возможного состояния гонки.
Допустим, мы не используем цикл while
, а вместо этого используем оператор if
. Результирующий код будет выглядеть так, как показано ниже. К сожалению, с этим кодом очень сложно найти проблему. Обратите внимание, как теперь нам нужно повторно добавить ранее удаленные операторы if tasks.count > 0
и unless task.nil?
в наш код ниже, чтобы обеспечить его безопасное выполнение.
# consumer threads threads += 5.times.map do Thread.new do while true task = nil mutex.synchronize do cond_var.wait(mutex) if tasks.empty?
# using `if tasks.empty?` forces us to once again add this # `if tasks.count > 0` check. We need this check to protect # ourselves against a nasty race condition. if tasks.count > 0 task = tasks.shift puts "Removed task: #{task.inspect}" else puts 'This thread has nothing to do' end end # using `if tasks.empty?` forces us to re-add `unless task.nil?` # in order to safeguard ourselves against a now newly introduced # race condition task.execute unless task.nil? end end end
Представьте себе сценарий, в котором у нас есть:
- две производственные нити
- один потребительский поток, который не спит
- четыре потребительских потока, которые спят
Проснувшийся потребительский поток вернется в спящий режим только тогда, когда в массиве tasks
больше нет задач. То есть один потребительский поток будет продолжать обрабатывать задачи до тех пор, пока не закончатся доступные задачи. Теперь предположим, что один из наших потоков-производителей добавляет новую задачу в текущий пустой массив tasks
перед вызовом cond_var.signal
примерно в то же время, когда наш активный поток-потребитель завершает свою текущую задачу. Этот вызов signal
разбудит один из наших спящих потребительских потоков, который затем попытается запланировать себя. Вот где может произойти состояние гонки!
Сейчас мы находимся в положении, когда два потребительских потока соревнуются за право владения мьютексом, чтобы попасть в расписание. Допустим, наша первая потребительская ветка побеждает в этом соревновании. Этот поток теперь пойдет и возьмет задачу из массива tasks
перед тем, как отказаться от мьютекса. Затем наш второй потребительский поток захватывает мьютекс и запускается. Однако, поскольку массив tasks
сейчас пуст, второму потоку-потребителю не над чем работать. Таким образом, этот второй потребительский поток теперь должен выполнить всю итерацию своего while true
цикла без какой-либо реальной цели.
Теперь мы находимся в ситуации, когда полная итерация цикла while true
может произойти, даже если массив tasks
пуст. Это мало чем отличается от положения, в котором мы находились, когда наша программа просто была занята опросом массива tasks
. Конечно, наша текущая программа будет более эффективной, чем опрос занятости, но нам все равно нужно будет защитить наш код от возможности повторения, когда нет доступной задачи. Вот почему нам потребовалось повторно добавить операторы if tasks.count > 0
и unless task.nil?
. Особенно важен последний из этих двух, иначе наша программа может вылететь из-за ошибки NilException
.
К счастью, мы можем безопасно избавиться от этих легко игнорируемых мер предосторожности, заставив каждый вновь пробужденный потребительский поток проверять наличие доступных задач и снова переводя его в спящий режим, если задачи недоступны. Такого поведения можно добиться, заменив оператор if tasks.empty?
на цикл while tasks.empty?
. Если задачи доступны, только что пробужденный поток выйдет из цикла и выполнит остальную часть своего кода. Однако, если задачи не найдены, цикл повторяется, в результате чего поток снова переводится в спящий режим, выполняя cond_var.wait
. В следующем разделе мы увидим, что у этого while
цикла есть еще одно преимущество.
Создание нашего собственного класса Queue
В начале предыдущего раздела мы коснулись того, как переменные условия используются классом Queue
для реализации поведения блокировки. В предыдущем разделе мы узнали достаточно о переменных состояния, чтобы мы могли самостоятельно реализовать базовый Queue
класс. Мы собираемся создать потокобезопасный SimpleQueue
класс, который сможет:
- добавив к нему данные с помощью оператора
<<
- получение данных с помощью неблокирующего
shift
метода - получение данных из него с помощью
shift
метода блокировки
Достаточно легко написать код, отвечающий этим первым двум критериям. Вероятно, в конечном итоге он будет выглядеть примерно так, как показано ниже. Обратите внимание, что наш класс SimpleQueue
использует мьютекс, поскольку мы хотим, чтобы этот класс был потокобезопасным, как и исходный класс Queue
.
class SimpleQueue def initialize @elems = [] @mutex = Mutex.new end
def <<(elem) @mutex.synchronize do @elems << elem end end
def shift(blocking = true) @mutex.synchronize do if blocking raise 'yet to be implemented' end @elems.shift end end end
simple_queue = SimpleQueue.new simple_queue << 'foo'
simple_queue.shift(false) # => "foo"
simple_queue.shift(false) # => nil
Теперь давайте посмотрим, что нужно для реализации поведения блокировки shift
. Как оказалось, на самом деле это очень просто. Мы хотим, чтобы поток блокировался, только если метод shift
вызывается, когда массив @elems
пуст. Это вся информация, которая нам нужна, чтобы определить, где нам нужно разместить вызов нашей переменной условия для wait
. Точно так же мы хотим, чтобы поток прекратил блокировку, как только оператор <<
добавит новый элемент, в результате чего @elems
больше не будет пустым. Это говорит нам, где именно нам нужно разместить вызов signal
.
В конце концов, нам просто нужно создать переменную условия, которая переводит поток в спящий режим, когда блокирующий shift
вызывается для пустого SimpleQueue
. Точно так же оператору <<
просто нужно сигнализировать переменной условия, когда добавляется новый элемент, тем самым вызывая пробуждение спящего потока. Вывод из этого заключается в том, что методы блокировки работают, заставляя вызывающий их поток засыпать. Также обратите внимание, что вызов @cond_var.wait
происходит внутри цикла while @elems.empty?
. Всегда используйте цикл while
при вызове wait
для переменной условия! Никогда не используйте выражение if
!
class SimpleQueue def initialize @elems = [] @mutex = Mutex.new @cond_var = ConditionVariable.new end
def <<(elem) @mutex.synchronize do @elems << elem @cond_var.signal end end
def shift(blocking = true) @mutex.synchronize do if blocking while @elems.empty? @cond_var.wait(@mutex) end end @elems.shift end end end
simple_queue = SimpleQueue.new
# this will print "blocking shift returned with: foo" after 5 seconds # that is to say, the first thread will go to sleep until the second # thread adds an element to the queue, thereby causing the first thread # to be woken up again threads = [] threads << Thread.new { puts "blocking shift returned with: #{simple_queue.shift}" } threads << Thread.new { sleep 5; simple_queue << 'foo' } threads.each(&:join)
В приведенном выше коде следует указать на то, что @cond_var.signal
может быть вызван, даже если нет спящих потоков. Это совершенно нормальный поступок. В сценариях такого типа вызов @cond_var.signal
просто ничего не сделает.
Ложные пробуждения
Под ложным пробуждением понимается пробуждение спящего потока без выполнения какого-либо signal
вызова. Этого невозможно избежать в условных переменных. Важно отметить, что это не вызвано ошибкой в интерпретаторе Ruby или чем-то подобным. Вместо этого разработчики библиотек потоков, используемых в вашей ОС, обнаружили, что включение случайного ложного пробуждения значительно увеличивает скорость операций с переменными состояния. Таким образом, любой код, использующий переменные состояния, должен учитывать ложные пробуждения.
Значит ли это, что нам нужно переписать весь код, который мы написали в этой статье, чтобы сделать его устойчивым к возможным ошибкам, вызванным ложным пробуждением? Вы будете рады узнать, что это не так, поскольку все фрагменты кода в этой статье всегда заключают оператор cond_var.wait
в цикл while
!
Ранее мы рассмотрели, как использование цикла while
делает наш код более эффективным при работе с определенными условиями гонки, поскольку он заставляет только что пробужденный поток проверять, есть ли на самом деле что-нибудь для него, и если нет, поток возвращается в спящий режим. Этот же цикл while
помогает нам также справляться с ложными пробуждениями.
Когда поток просыпается из-за ложного пробуждения, и ему нечего делать, наше использование цикла while
заставит поток обнаружить это и вернуться в спящий режим. С точки зрения потока, пробуждение от ложного пробуждения ничем не отличается от пробуждения без каких-либо доступных задач. Таким образом, тот же механизм, который помогает нам справляться с условиями гонки, также решает нашу проблему ложного пробуждения. К настоящему времени должно быть очевидно, что while
циклы играют очень важную роль при работе с условными переменными.
Заключение
Переменные условий Ruby печально известны своей плохой документацией. Это позор, потому что это прекрасные структуры данных для эффективного решения очень специфического набора проблем. Хотя, как мы убедились, их использование не обходится без подводных камней. Я надеюсь, что этот пост поможет лучше понять их (и их ловушки) в более широком сообществе Ruby.
Я также чувствую, что должен указать, что, хотя все, что упомянуто выше, является правильным, насколько мне известно, я не могу гарантировать, что при написании этого не было абсолютно никаких ошибок. Как всегда, не стесняйтесь связываться со мной, если вы думаете, что я что-то не так, или даже если вы просто хотите поздороваться.