В этой статье мы разработаем упрощенную версию DispatchQueue
под названием SerialQueue
. Это решение основано на двух основных методах: sync
и async
. Метод sync выполняет задачу в вызывающем потоке и ожидает ее завершения, тогда как асинхронный метод выполняет задачу в фоновом потоке (хотя и не всегда, но мы упростим этот аспект), не блокируя вызывающий поток. Для более полного понимания очередей перейдите по следующей ссылке.
В отличие от DispatchQueue GCD, мы создадим отдельный класс SerialQueue, который будет содержать только последовательную логику, исключая любые параллельные компоненты. В приведенном ниже фрагменте кода мы выделяем основные переменные:
thread
: предоставляет среду выполнения, в которой будет выполняться асинхронная задача.condition
: Организованное общение между методамиsync
иasync
.mutex
: обеспечивает безопасность потоков, предотвращая одновременный доступ к общим ресурсам.stop
: завершает цикл while в фоновом потоке, завершая выполнение потока.
final class SerialQueue { private var thread: pthread_t? private var condition = pthread_cond_t() private var mutex = pthread_mutex_t() // Used to stop executing background thread private var stop = false init() { let weakSelfPointer = Unmanaged.passUnretained(self).toOpaque() _ = pthread_create(&thread, nil, { (pointer: UnsafeMutableRawPointer) in let weakSelf = Unmanaged<SerialQueue>.fromOpaque(pointer).takeRetainedValue() weakSelf.runThread() pthread_exit(nil) }, weakSelfPointer) pthread_cond_init(&condition, nil) pthread_mutex_init(&mutex, nil) } }
Раздел инициализации содержит стандартные функции настройки, такие как pthread_cond_init
и pthread_mutex_init
. Он также создает фоновый поток через pthread_create
. Однако мы не можем напрямую вызвать self.runThread()
из замыкания, переданного в качестве параметра, в pthread_create
, так как компилятор идентифицирует замыкание как указатель на C-функцию и вернет ошибку. Чтобы решить эту проблему, мы создадим указатель Unmanaged
на self
, который будет передан в качестве параметра pthread_create
.
В замыкании мы используем Unmanaged<SerialQueue>
для получения типизированной ссылки на self
. Методы passUnretained
и takeRetainedValue
используются для управления счетчиком удержания, обеспечивая слабую ссылку на self
в этом контексте. Это означает, что мы не хотим увеличивать счетчик ссылок. Балансировка счетчика удержания может быть сложной задачей, поэтому использование CFGetRetainCount
(во время отладки) является эффективным подходом для внесения корректировок. Наконец, мы вызываем pthread_exit
для завершения потока, который будет выполнен, когда runThread
завершит свою работу, в нашем случае во времяdeinit
.
Базовый принцип реализации последовательной очереди основан на том, что все async
задач будут выполняться в фоновом потоке (тот, который мы только что создали), а sync
задач будут выполняться в вызывающем потоке. Механизм планирования этих задач заключается в использовании переменной condition
.
Следующим компонентом для разработки является Task
, который представляет реальную задачу, которую необходимо выполнить. Он включает тип async
или sync
, замыкание, содержащее логику, которая должна выполняться в очереди, и тип threadId
. Значение threadId
будет разъяснено позже в ходе обсуждения.
fileprivate final class Task { fileprivate enum TaskType: String { case sync case async } let type: TaskType let execute: (() -> ()) let threadId: UInt32 init(type: TaskType, threadId: UInt32, execute: @escaping (() -> ())) { self.type = type self.execute = execute self.threadId = threadId } }
Чтобы поддерживать порядок задач, мы вводим очередь, которая следует принципу FIFO. Другим важным аспектом этой очереди является безопасность потоков, поскольку она будет широко использоваться в многопоточной среде. Мы не будем слишком углубляться в реализацию самой очереди, так как она довольно проста.
fileprivate final class Queue { private var elements: [Task] = [] private var mutex = pthread_mutex_t() init() { pthread_mutex_init(&mutex, nil) } func enqueue(element: Task) { pthread_mutex_lock(&mutex) elements.append(element) pthread_mutex_unlock(&mutex) } func dequeue() -> Task? { pthread_mutex_lock(&mutex) defer { pthread_mutex_unlock(&mutex) } guard !elements.isEmpty else { return nil } return elements.removeFirst() } var isEmpty: Bool { pthread_mutex_lock(&mutex) defer { pthread_mutex_unlock(&mutex) } return elements.isEmpty } }
Итак, мы добавим очередь как приватную переменную в нашу реализацию SerialQueue
:
// Used to make an order for sync and async tasks private let queue = Queue()
Теперь у нас есть все необходимые примитивы, и мы можем приступить к разработке методов. Поскольку async
намного проще реализовать, мы начнем с него. Он получает threadId
вызывающего потока, добавляет задачу с типом async
в очередь задач и вызывает pthread_cond_broadcast
для пробуждения фонового потока. Еще раз хочу подчеркнуть, что threadId
будет использоваться позже, когда мы реализуем метод sync
.
func async(task: @escaping () -> ()) { let threadId = pthread_mach_thread_np(pthread_self()) queue.enqueue(element: Task(type: .async, threadId: threadId, execute: task)) pthread_cond_broadcast(&condition) }
Есть веская причина не использовать pthread_cond_signal
в этом контексте. Основная проблема с pthread_cond_signal
заключается в отсутствии гарантии того, какой pthread_cond_wait
он пробудит. Политика планирования в конечном итоге определяет, какой поток будет разблокирован первым. Вызвав pthread_getschedparam
, мы можем получить политику планирования для потока (которая часто имеет значение SCHED_OTHER
по умолчанию во многих системах), причем порядок основан на приоритете, определяемом параметром nice value
. Вместо того, чтобы углубляться в это направление, мы предоставим более универсальное решение, которое остается независимым от порядка вызова. Это достигается за счет использования pthread_cond_broadcast
, который разблокирует все методы ожидания с помощью одного и того же condition
. Кроме того, мы вводим цикл while, окружающий pthread_cond_wait
, чтобы предотвратить продолжение выполнения всех ожидающих задач, кроме той, которая удовлетворяет условию внутри цикла.
После добавления асинхронной задачи в очередь задач нам нужно выполнить задачи из очереди в фоновом потоке. Для этого мы создаем цикл while, который работает бесконечно, пока не будет установлен флаг остановки. Внутри этого бесконечного цикла есть еще один цикл, который извлекает задачи из очереди задач до тех пор, пока она не станет пустой.
Этот внутренний цикл задачи содержит условие, которое проверяет тип задачи; если это асинхронная задача, она будет выполняться в фоновом потоке. Вскоре мы обсудим компонент синхронизации более подробно. После выполнения задачи другой цикл while постоянно вызывает pthread_cond_wait
, если очередь задач пуста и флаг остановки не установлен. Этот конкретный сценарий мы обсуждали ранее, что предотвращает разблокировку потока, поскольку мы используем pthread_cond_broadcast
во всей нашей реализации.
private func runThread() { while(!stop) { while let task = queue.dequeue() { if task.type == .sync { // TODO Implement } else { task.execute() } } // Until the task queue is empty put on wait while(queue.isEmpty && !stop) { pthread_cond_wait(&condition, &mutex) } } }
Чтобы лучше понять алгоритм метода async
, существует схема, которая изображает его процесс работы как с вызывающим потоком, так и с фоновым потоком, предполагая, что в данный момент нет выполняемых задач:
Реализация не слишком сложна при работе только с асинхронными задачами в последовательной очереди. Однако объединение задач sync
и async
вносит дополнительную сложность в общую логику. Давайте теперь приступим к методу sync
. Прежде чем погрузиться в его реализацию, мы должны сначала представить еще один контейнер, который будет использоваться в методе sync
. Этот контейнер с именем SynchronizedDictionary
представляет собой простую потокобезопасную оболочку для контейнера Dictionary
. Он предлагает две основные операции: set
, которая добавляет значение в словарь, и value
, которая возвращает значение boolean
на основе ключа UInt32
.
fileprivate final class SynchronizedDictionary { private var storage: [UInt32: Bool] = [:] private var mutex = pthread_mutex_t() init() { pthread_mutex_init(&mutex, nil) } func add(value: Bool, key: UInt32) { pthread_mutex_lock(&mutex) storage[key] = value pthread_mutex_unlock(&mutex) } func value(key: UInt32) -> Bool? { pthread_mutex_lock(&mutex) defer { pthread_mutex_unlock(&mutex) } return storage[key] } }
Теперь мы можем добавить SynchronizedDictionary
в качестве поля к SerialQueue
:
// Used to store sync execution status per thread private var syncExecutionStates = SynchronizedDictionary()
Когда все компоненты подготовлены, мы готовы реализовать метод sync
. Во-первых, мы получаем идентификатор вызывающего потока (который будет использоваться позже в фоновом потоке), создаем задачу синхронизации и добавляем ее в очередь задач. Следующий шаг включает в себя установку значения false
в SynchronizedDictionary
для данного идентификатора потока, что означает, что задача синхронизации в настоящее время не выполняется. Этот процесс является важнейшим компонентом механизма планирования, поскольку он обеспечивает правильное последовательное выполнение как синхронных, так и асинхронных задач.
После этого мы блокируем выполнение и уведомляем фоновый поток, если он был настроен на ожидание. Затем мы приостанавливаем вызывающий поток до тех пор, пока фоновый поток не завершит выполнение других задач в очереди задач. Как только выполнение задачи завершено, мы помечаем вызывающий поток как невыполнимый с помощью syncExecutionStates
. Затем мы уведомляем фоновый поток о завершении выполнения sync
и разблокируем вызывающий поток.
func sync(task: @escaping () -> ()) { // Storing sync task with dedicated thread id into thread safe queue for the following execution let threadId = pthread_mach_thread_np(pthread_self()) queue.enqueue(element: Task(type: .sync, threadId: threadId, execute: task)) // Mark the task is NOT executing yet for the calling thread (threadId) syncExecutionStates.set(value: false, key: threadId) pthread_mutex_lock(&mutex) // Unblock the queue thread if it doesn’t execute any task pthread_cond_broadcast(&condition) // Put on wait current thread if the queue thread is executing the other task while(syncExecutionStates.value(key: threadId) == false) { pthread_cond_wait(&condition, &mutex) } // Execute the task on the calling thread task() // Mark the task is NOT executing for the calling thread (threadId) syncExecutionStates.set(value: false, key: threadId) pthread_cond_broadcast(&condition) pthread_mutex_unlock(&mutex) }
Мы намеренно поместили первую часть метода sync
вне блокировки по определенной причине. Чтобы лучше понять это, давайте рассмотрим пример: несколько вызовов sync
и async
выполняются из разных потоков. Если бы мы поместили pthread_mutex_lock
в начало метода sync
, это заблокировало бы метод, заставив другие вызовы sync
ждать, пока он не будет разблокирован. В то же время может произойти вызов async
. Так как метод async
не использует блокировки, он сразу добавит задачу в очередь задач перед методами синхронизации, которые были вызваны ранее. Это нарушит порядок выполнения, поэтому мы должны быть уверены, что задача будет добавлена в очередь задач сразу после вызова метода.
Та же причина применима и к syncExecutionStates
, который необходимо поместить вне блокировки, поскольку он может заблокировать вызывающий поток. Если фоновый поток попытается выполнить связанную задачу синхронизации, в то время как вызывающий поток заблокирован, вызывающий поток пропустит широковещательный вызов, и порядок выполнения будет скомпрометирован.
Первая часть логики sync
выполнена, и теперь мы переключимся на оставшуюся часть, которая происходит в фоновом потоке, отмеченном TODO
. Если тип задачи sync
, нам нужно уведомить связанный с ней метод sync
о том, что задача готова к выполнению. Для этого мы передаем threadId
в качестве параметра при добавлении задачи в очередь задач. Используя syncExecutionStates
, мы можем определить, что в данный момент выполняется задача с указанным threadId
.
Затем мы вызываем pthread_cond_broadcast
, чтобы разбудить метод sync
, позволяя ему выполнить задачу в соответствующем вызывающем потоке. Наконец, мы должны дождаться завершения выполнения задачи, используя pthread_cond_wait
.
private func runThread() { while(!stop) { while let task = queue.dequeue() { if task.type == .sync { // Mark the task is executing for the thread id syncExecutionStates.set(value: true, key: task.threadId) pthread_cond_broadcast(&condition) // Lock the queue thread while the sync task is executing on the calling thread while(syncExecutionStates.value(key: task.threadId) == true) { pthread_cond_wait(&condition, &mutex) } continue } else { task.execute() } } // Until the task queue is empty put on wait while(queue.isEmpty && !stop) { pthread_cond_wait(&condition, &mutex) } } }
SynchronizedDictionary
определяет идентификатор вызывающего потока, который должен быть выполнен в методе sync
. Например, представьте, что несколько потоков вызывают метод sync
одного и того же экземпляра SerialQueue
. Когда фоновый поток извлекает задачу синхронизации, он содержит идентификатор потока, что позволяет фоновому потоку идентифицировать конкретный поток, вызвавший задачу синхронизации.
Используя SynchronizedDictionary
, мы можем выбрать соответствующий поток (установив его значение threadId
в true
). Когда целевой поток просыпается после вызова pthread_cond_broadcast
, выполнение продолжится только для отмеченного потока. Тем временем другие потоки пройдут цикл while и вернутся в состояние ожидания.
Следующая диаграмма иллюстрирует алгоритм метода sync
, изображая общую логику между вызывающим потоком и фоновым потоком в предположении, что очередь задач пуста:
Действительно, методы sync
и async
предназначены для совместной работы как сплоченный механизм, когда один метод влияет на другой. Именно благодаря этой взаимосвязи мы широко используем переменную condition
в нашей реализации. Переменная condition
обеспечивает правильную координацию и синхронизацию между вызывающим и фоновым потоками, гарантируя, что задачи выполняются сериализованным образом.
Теперь нам нужно реализовать логику освобождения для SerialQueue
, что тоже не тривиально. Во-первых, мы устанавливаем флаг stop
в true
, что завершает основной цикл while в фоновом потоке в методе runThread
. Далее, если фоновый поток в данный момент не выполняет никаких задач, мы пробуждаем его с помощью pthread_cond_broadcast
. Это гарантирует, что фоновый поток сможет правильно выйти из цикла while. Затем мы вызываем pthread_join
, чтобы вызывающий поток ждал, пока фоновый поток не завершит свое выполнение. Этот шаг имеет решающее значение для предотвращения сбоев, которые могут возникнуть в результате доступа к освобожденной памяти. После завершения выполнения фонового потока мы приступаем к освобождению переменных condition
и mutex
.
deinit { // Set stop flag to finish while loop in the background thread stop = true // Wake background thread if it was put on wait pthread_cond_broadcast(&condition) // Wait until background thread finishes its execution if let thread = thread { pthread_join(thread, nil) } pthread_cond_destroy(&condition) pthread_mutex_destroy(&mutex) }
Вот схема, показывающая, как работает логика deinit
:
# тестов
Теперь мы внедрили все части комплексного решения. Хотя это может показаться сложным, особенно при решении многочисленных угловых случаев, и иногда их отлов становится практически невозможным. К счастью, есть методы и инструменты, которые помогут решить эти проблемы. Модульное тестирование является одним из таких методов. С точки зрения передовой практики разработки программного обеспечения модульные тесты играют ключевую роль. Они играют важную роль в обеспечении корректности и стабильности кода. Хотя существуют различные подходы к написанию тестов, в этой статье основное внимание будет уделено применению модульных тестов конкретно к SerialQueue
.
При работе с асинхронным кодом мы часто сталкиваемся с плавающими ошибками, которые возникают не при каждом выполнении. Чтобы помочь устранить эти ошибки, Xcode предлагает специальную функцию, называемую повторяющимся запуском. Эта функция позволяет запускать тест несколько раз подряд, повышая вероятность обнаружения и выявления спорадических проблем. Чтобы запустить повторный запуск, просто щелкните тест правой кнопкой мыши и выберите этот параметр в контекстном меню:
После этого мы можем указать количество повторений, условия завершения и другие параметры:
Пытаясь охватить большинство поведенческих сценариев для SerialQueue
, я написал различные модульные тесты. Хотя в этой статье мы рассмотрим только один такой тест, вы можете ознакомиться с остальными тестами, перейдя по ссылке. Я призываю вас поделиться в комментариях ниже любыми возможными краеугольными случаями, которые вы обнаружите.
В приведенном ниже тесте мы используем expectation
несколько раз, что является еще одним инструментом, помогающим тестировать асинхронный код. Он работает аналогично семафорам и условиям. Используя функцию wait
, мы блокируем вызывающий поток, пока все ожидания не сигнализируют о завершении, используя fulfill
. Как видите, одни методы вызываются из основного потока, а другие — из глобальной очереди. Существует массив result
, который, как ожидается, будет содержать упорядоченную последовательность [1, 2, 3, 4, 5], так как мы работаем с последовательной очередью, порядок должен гарантироваться последовательностью вызовов. Мы также решили не блокировать result
, так как только один поток может получить к нему доступ одновременно.
func testFromMainAndBackgroundThreads() { let serialQueue = SerialQueue() var result: [Int] = [] let expectation1 = expectation(description: “test1”) serialQueue.async { sleep(1) result.append(1) expectation1.fulfill() } let expectation2 = expectation(description: “test2”) serialQueue.async { sleep(1) result.append(2) expectation2.fulfill() } let expectation3 = expectation(description: “test3”) DispatchQueue.global().asyncAfter(deadline: .now() + 0.3) { serialQueue.sync { sleep(1) result.append(3) expectation3.fulfill() } } let expectation4 = expectation(description: “test4”) DispatchQueue.global().asyncAfter(deadline: .now() + 0.4) { serialQueue.async { sleep(1) result.append(4) expectation4.fulfill() } } let expectation5 = expectation(description: “test5”) DispatchQueue.global().asyncAfter(deadline: .now() + 0.5) { serialQueue.sync { sleep(1) result.append(5) expectation5.fulfill() } } wait(for: [expectation1, expectation2, expectation3, expectation4, expectation5], timeout: 10.0) XCTAssertEqual([1, 2, 3, 4, 5], result) }
В этой статье мы реализовали упрощенную версию последовательной очереди, которая широко используется в приложениях iOS, чтобы лучше понять ее функциональность. Важно знать, что исходная реализация очереди использует незаблокированные примитивы, которые могут обеспечить преимущества в производительности. Однако мы решили не использовать их здесь, чтобы не усложнять статью, так как даже с блокировками объяснение было не очень простым. Кроме того, не забудьте не использовать эту реализацию в производственной среде, так как она была создана исключительно в образовательных целях и может быть нестабильной в некоторых крайних случаях по сравнению с версией GCD.
Чтобы увидеть полную реализацию, не стесняйтесь посетить репозиторий GitHub.