В этой статье мы разработаем упрощенную версию DispatchQueue под названием SerialQueue. Это решение основано на двух основных методах: sync и async. Метод sync выполняет задачу в вызывающем потоке и ожидает ее завершения, тогда как асинхронный метод выполняет задачу в фоновом потоке (хотя и не всегда, но мы упростим этот аспект), не блокируя вызывающий поток. Для более полного понимания очередей перейдите по следующей ссылке.

В отличие от DispatchQueue GCD, мы создадим отдельный класс SerialQueue, который будет содержать только последовательную логику, исключая любые параллельные компоненты. В приведенном ниже фрагменте кода мы выделяем основные переменные:

  • thread: предоставляет среду выполнения, в которой будет выполняться асинхронная задача.
  • condition: Организованное общение между методами sync и async.
  • mutex: обеспечивает безопасность потоков, предотвращая одновременный доступ к общим ресурсам.
  • stop: завершает цикл while в фоновом потоке, завершая выполнение потока.
final class SerialQueue {
  private var thread: pthread_t?
  private var condition = pthread_cond_t()
  private var mutex = pthread_mutex_t()

  // Used to stop executing background thread
  private var stop = false

  init() {
    let weakSelfPointer = Unmanaged.passUnretained(self).toOpaque()
    
    _ = pthread_create(&thread, nil, { (pointer: UnsafeMutableRawPointer) in
      let weakSelf = Unmanaged<SerialQueue>.fromOpaque(pointer).takeRetainedValue()
      weakSelf.runThread()
      pthread_exit(nil)
    }, weakSelfPointer)
    
    pthread_cond_init(&condition, nil)
    pthread_mutex_init(&mutex, nil)
  }
}

Раздел инициализации содержит стандартные функции настройки, такие как pthread_cond_init и pthread_mutex_init. Он также создает фоновый поток через pthread_create. Однако мы не можем напрямую вызвать self.runThread() из замыкания, переданного в качестве параметра, в pthread_create, так как компилятор идентифицирует замыкание как указатель на C-функцию и вернет ошибку. Чтобы решить эту проблему, мы создадим указатель Unmanaged на self, который будет передан в качестве параметра pthread_create.

В замыкании мы используем Unmanaged<SerialQueue> для получения типизированной ссылки на self. Методы passUnretained и takeRetainedValue используются для управления счетчиком удержания, обеспечивая слабую ссылку на self в этом контексте. Это означает, что мы не хотим увеличивать счетчик ссылок. Балансировка счетчика удержания может быть сложной задачей, поэтому использование CFGetRetainCount (во время отладки) является эффективным подходом для внесения корректировок. Наконец, мы вызываем pthread_exit для завершения потока, который будет выполнен, когда runThread завершит свою работу, в нашем случае во времяdeinit.

Базовый принцип реализации последовательной очереди основан на том, что все async задач будут выполняться в фоновом потоке (тот, который мы только что создали), а sync задач будут выполняться в вызывающем потоке. Механизм планирования этих задач заключается в использовании переменной condition.

Следующим компонентом для разработки является Task, который представляет реальную задачу, которую необходимо выполнить. Он включает тип async или sync, замыкание, содержащее логику, которая должна выполняться в очереди, и тип threadId. Значение threadId будет разъяснено позже в ходе обсуждения.

fileprivate final class Task {

  fileprivate enum TaskType: String {
    case sync
    case async
  }
  
  let type: TaskType
  let execute: (() -> ())
  let threadId: UInt32
  
  init(type: TaskType, threadId: UInt32, execute: @escaping (() -> ())) {
    self.type = type
    self.execute = execute
    self.threadId = threadId
  }
}

Чтобы поддерживать порядок задач, мы вводим очередь, которая следует принципу FIFO. Другим важным аспектом этой очереди является безопасность потоков, поскольку она будет широко использоваться в многопоточной среде. Мы не будем слишком углубляться в реализацию самой очереди, так как она довольно проста.

fileprivate final class Queue {
  private var elements: [Task] = []
  private var mutex = pthread_mutex_t()

  init() {
    pthread_mutex_init(&mutex, nil)
  }

  func enqueue(element: Task) {
    pthread_mutex_lock(&mutex)
    elements.append(element)
    pthread_mutex_unlock(&mutex)
  }
  
   func dequeue() -> Task? {
     pthread_mutex_lock(&mutex)
     defer {
       pthread_mutex_unlock(&mutex)
     }
     
     guard !elements.isEmpty else { return nil }
     
     return elements.removeFirst()
   }
   
   var isEmpty: Bool {
     pthread_mutex_lock(&mutex)
     defer {
       pthread_mutex_unlock(&mutex)
     }
     return elements.isEmpty
   }
}

Итак, мы добавим очередь как приватную переменную в нашу реализацию SerialQueue:

// Used to make an order for sync and async tasks
private let queue = Queue()

Теперь у нас есть все необходимые примитивы, и мы можем приступить к разработке методов. Поскольку async намного проще реализовать, мы начнем с него. Он получает threadId вызывающего потока, добавляет задачу с типом async в очередь задач и вызывает pthread_cond_broadcast для пробуждения фонового потока. Еще раз хочу подчеркнуть, что threadId будет использоваться позже, когда мы реализуем метод sync.

func async(task: @escaping () -> ()) {
  let threadId = pthread_mach_thread_np(pthread_self())
  queue.enqueue(element: Task(type: .async, threadId: threadId, execute: task))
  pthread_cond_broadcast(&condition)
}

Есть веская причина не использовать pthread_cond_signal в этом контексте. Основная проблема с pthread_cond_signal заключается в отсутствии гарантии того, какой pthread_cond_wait он пробудит. Политика планирования в конечном итоге определяет, какой поток будет разблокирован первым. Вызвав pthread_getschedparam, мы можем получить политику планирования для потока (которая часто имеет значение SCHED_OTHER по умолчанию во многих системах), причем порядок основан на приоритете, определяемом параметром nice value. Вместо того, чтобы углубляться в это направление, мы предоставим более универсальное решение, которое остается независимым от порядка вызова. Это достигается за счет использования pthread_cond_broadcast, который разблокирует все методы ожидания с помощью одного и того же condition. Кроме того, мы вводим цикл while, окружающий pthread_cond_wait, чтобы предотвратить продолжение выполнения всех ожидающих задач, кроме той, которая удовлетворяет условию внутри цикла.

После добавления асинхронной задачи в очередь задач нам нужно выполнить задачи из очереди в фоновом потоке. Для этого мы создаем цикл while, который работает бесконечно, пока не будет установлен флаг остановки. Внутри этого бесконечного цикла есть еще один цикл, который извлекает задачи из очереди задач до тех пор, пока она не станет пустой.

Этот внутренний цикл задачи содержит условие, которое проверяет тип задачи; если это асинхронная задача, она будет выполняться в фоновом потоке. Вскоре мы обсудим компонент синхронизации более подробно. После выполнения задачи другой цикл while постоянно вызывает pthread_cond_wait, если очередь задач пуста и флаг остановки не установлен. Этот конкретный сценарий мы обсуждали ранее, что предотвращает разблокировку потока, поскольку мы используем pthread_cond_broadcast во всей нашей реализации.

private func runThread() {
  while(!stop) {
    while let task = queue.dequeue() {
      if task.type == .sync {
        // TODO Implement
      } else {
        task.execute()
      }
    }
    
    // Until the task queue is empty put on wait
    while(queue.isEmpty && !stop) {
      pthread_cond_wait(&condition, &mutex)
    }
  }
}

Чтобы лучше понять алгоритм метода async, существует схема, которая изображает его процесс работы как с вызывающим потоком, так и с фоновым потоком, предполагая, что в данный момент нет выполняемых задач:

Реализация не слишком сложна при работе только с асинхронными задачами в последовательной очереди. Однако объединение задач sync и async вносит дополнительную сложность в общую логику. Давайте теперь приступим к методу sync. Прежде чем погрузиться в его реализацию, мы должны сначала представить еще один контейнер, который будет использоваться в методе sync. Этот контейнер с именем SynchronizedDictionary представляет собой простую потокобезопасную оболочку для контейнера Dictionary. Он предлагает две основные операции: set, которая добавляет значение в словарь, и value, которая возвращает значение boolean на основе ключа UInt32.

fileprivate final class SynchronizedDictionary {
  private var storage: [UInt32: Bool] = [:]
  private var mutex = pthread_mutex_t()
  
  init() {
    pthread_mutex_init(&mutex, nil)
  }

  func add(value: Bool, key: UInt32) {
    pthread_mutex_lock(&mutex)
    storage[key] = value
    pthread_mutex_unlock(&mutex)
  }

  func value(key: UInt32) -> Bool? {
    pthread_mutex_lock(&mutex)
    defer {
      pthread_mutex_unlock(&mutex)
    }
    return storage[key]
  }
}

Теперь мы можем добавить SynchronizedDictionary в качестве поля к SerialQueue:

// Used to store sync execution status per thread
private var syncExecutionStates = SynchronizedDictionary()

Когда все компоненты подготовлены, мы готовы реализовать метод sync. Во-первых, мы получаем идентификатор вызывающего потока (который будет использоваться позже в фоновом потоке), создаем задачу синхронизации и добавляем ее в очередь задач. Следующий шаг включает в себя установку значения false в SynchronizedDictionary для данного идентификатора потока, что означает, что задача синхронизации в настоящее время не выполняется. Этот процесс является важнейшим компонентом механизма планирования, поскольку он обеспечивает правильное последовательное выполнение как синхронных, так и асинхронных задач.

После этого мы блокируем выполнение и уведомляем фоновый поток, если он был настроен на ожидание. Затем мы приостанавливаем вызывающий поток до тех пор, пока фоновый поток не завершит выполнение других задач в очереди задач. Как только выполнение задачи завершено, мы помечаем вызывающий поток как невыполнимый с помощью syncExecutionStates. Затем мы уведомляем фоновый поток о завершении выполнения sync и разблокируем вызывающий поток.

func sync(task: @escaping () -> ()) {
  // Storing sync task with dedicated thread id into thread safe queue for the following execution
  let threadId = pthread_mach_thread_np(pthread_self())
  queue.enqueue(element: Task(type: .sync, threadId: threadId, execute: task))

  // Mark the task is NOT executing yet for the calling thread (threadId)
  syncExecutionStates.set(value: false, key: threadId)

  pthread_mutex_lock(&mutex)
  // Unblock the queue thread if it doesn’t execute any task
  pthread_cond_broadcast(&condition)

  // Put on wait current thread if the queue thread is executing the other task
  while(syncExecutionStates.value(key: threadId) == false) {
    pthread_cond_wait(&condition, &mutex)
  }
  // Execute the task on the calling thread
  task()
  
  // Mark the task is NOT executing for the calling thread (threadId)
  syncExecutionStates.set(value: false, key: threadId)
  pthread_cond_broadcast(&condition)
  pthread_mutex_unlock(&mutex)
}

Мы намеренно поместили первую часть метода sync вне блокировки по определенной причине. Чтобы лучше понять это, давайте рассмотрим пример: несколько вызовов sync и async выполняются из разных потоков. Если бы мы поместили pthread_mutex_lock в начало метода sync, это заблокировало бы метод, заставив другие вызовы sync ждать, пока он не будет разблокирован. В то же время может произойти вызов async. Так как метод async не использует блокировки, он сразу добавит задачу в очередь задач перед методами синхронизации, которые были вызваны ранее. Это нарушит порядок выполнения, поэтому мы должны быть уверены, что задача будет добавлена ​​в очередь задач сразу после вызова метода.

Та же причина применима и к syncExecutionStates, который необходимо поместить вне блокировки, поскольку он может заблокировать вызывающий поток. Если фоновый поток попытается выполнить связанную задачу синхронизации, в то время как вызывающий поток заблокирован, вызывающий поток пропустит широковещательный вызов, и порядок выполнения будет скомпрометирован.

Первая часть логики sync выполнена, и теперь мы переключимся на оставшуюся часть, которая происходит в фоновом потоке, отмеченном TODO. Если тип задачи sync, нам нужно уведомить связанный с ней метод sync о том, что задача готова к выполнению. Для этого мы передаем threadId в качестве параметра при добавлении задачи в очередь задач. Используя syncExecutionStates, мы можем определить, что в данный момент выполняется задача с указанным threadId.

Затем мы вызываем pthread_cond_broadcast, чтобы разбудить метод sync, позволяя ему выполнить задачу в соответствующем вызывающем потоке. Наконец, мы должны дождаться завершения выполнения задачи, используя pthread_cond_wait.

private func runThread() {
  while(!stop) {
    while let task = queue.dequeue() {
      if task.type == .sync {
        // Mark the task is executing for the thread id
        syncExecutionStates.set(value: true, key: task.threadId)
        pthread_cond_broadcast(&condition)
        
        // Lock the queue thread while the sync task is executing on the calling thread
        while(syncExecutionStates.value(key: task.threadId) == true) {
          pthread_cond_wait(&condition, &mutex)
        }
        continue
      } else {
        task.execute()
      }
    }

    // Until the task queue is empty put on wait
    while(queue.isEmpty && !stop) {
      pthread_cond_wait(&condition, &mutex)
    }
  }
}

SynchronizedDictionary определяет идентификатор вызывающего потока, который должен быть выполнен в методе sync. Например, представьте, что несколько потоков вызывают метод sync одного и того же экземпляра SerialQueue. Когда фоновый поток извлекает задачу синхронизации, он содержит идентификатор потока, что позволяет фоновому потоку идентифицировать конкретный поток, вызвавший задачу синхронизации.

Используя SynchronizedDictionary, мы можем выбрать соответствующий поток (установив его значение threadId в true). Когда целевой поток просыпается после вызова pthread_cond_broadcast, выполнение продолжится только для отмеченного потока. Тем временем другие потоки пройдут цикл while и вернутся в состояние ожидания.

Следующая диаграмма иллюстрирует алгоритм метода sync, изображая общую логику между вызывающим потоком и фоновым потоком в предположении, что очередь задач пуста:

Действительно, методы sync и async предназначены для совместной работы как сплоченный механизм, когда один метод влияет на другой. Именно благодаря этой взаимосвязи мы широко используем переменную condition в нашей реализации. Переменная condition обеспечивает правильную координацию и синхронизацию между вызывающим и фоновым потоками, гарантируя, что задачи выполняются сериализованным образом.

Теперь нам нужно реализовать логику освобождения для SerialQueue, что тоже не тривиально. Во-первых, мы устанавливаем флаг stop в true, что завершает основной цикл while в фоновом потоке в методе runThread. Далее, если фоновый поток в данный момент не выполняет никаких задач, мы пробуждаем его с помощью pthread_cond_broadcast. Это гарантирует, что фоновый поток сможет правильно выйти из цикла while. Затем мы вызываем pthread_join, чтобы вызывающий поток ждал, пока фоновый поток не завершит свое выполнение. Этот шаг имеет решающее значение для предотвращения сбоев, которые могут возникнуть в результате доступа к освобожденной памяти. После завершения выполнения фонового потока мы приступаем к освобождению переменных condition и mutex.

deinit {
  // Set stop flag to finish while loop in the background thread
  stop = true

  // Wake background thread if it was put on wait
  pthread_cond_broadcast(&condition)

  // Wait until background thread finishes its execution
  if let thread = thread {
    pthread_join(thread, nil)
  }

  pthread_cond_destroy(&condition)
  pthread_mutex_destroy(&mutex)
}

Вот схема, показывающая, как работает логика deinit:

# тестов

Теперь мы внедрили все части комплексного решения. Хотя это может показаться сложным, особенно при решении многочисленных угловых случаев, и иногда их отлов становится практически невозможным. К счастью, есть методы и инструменты, которые помогут решить эти проблемы. Модульное тестирование является одним из таких методов. С точки зрения передовой практики разработки программного обеспечения модульные тесты играют ключевую роль. Они играют важную роль в обеспечении корректности и стабильности кода. Хотя существуют различные подходы к написанию тестов, в этой статье основное внимание будет уделено применению модульных тестов конкретно к SerialQueue.

При работе с асинхронным кодом мы часто сталкиваемся с плавающими ошибками, которые возникают не при каждом выполнении. Чтобы помочь устранить эти ошибки, Xcode предлагает специальную функцию, называемую повторяющимся запуском. Эта функция позволяет запускать тест несколько раз подряд, повышая вероятность обнаружения и выявления спорадических проблем. Чтобы запустить повторный запуск, просто щелкните тест правой кнопкой мыши и выберите этот параметр в контекстном меню:

После этого мы можем указать количество повторений, условия завершения и другие параметры:

Пытаясь охватить большинство поведенческих сценариев для SerialQueue, я написал различные модульные тесты. Хотя в этой статье мы рассмотрим только один такой тест, вы можете ознакомиться с остальными тестами, перейдя по ссылке. Я призываю вас поделиться в комментариях ниже любыми возможными краеугольными случаями, которые вы обнаружите.

В приведенном ниже тесте мы используем expectation несколько раз, что является еще одним инструментом, помогающим тестировать асинхронный код. Он работает аналогично семафорам и условиям. Используя функцию wait, мы блокируем вызывающий поток, пока все ожидания не сигнализируют о завершении, используя fulfill. Как видите, одни методы вызываются из основного потока, а другие — из глобальной очереди. Существует массив result, который, как ожидается, будет содержать упорядоченную последовательность [1, 2, 3, 4, 5], так как мы работаем с последовательной очередью, порядок должен гарантироваться последовательностью вызовов. Мы также решили не блокировать result, так как только один поток может получить к нему доступ одновременно.

func testFromMainAndBackgroundThreads() {
  let serialQueue = SerialQueue()
  var result: [Int] = []
  
  let expectation1 = expectation(description: “test1”)
  serialQueue.async {
    sleep(1)
    result.append(1)
    expectation1.fulfill()
  }

  let expectation2 = expectation(description: “test2”)
  serialQueue.async {
    sleep(1)
    result.append(2)
    expectation2.fulfill()
  }

  let expectation3 = expectation(description: “test3”)
  DispatchQueue.global().asyncAfter(deadline: .now() + 0.3) {
    serialQueue.sync {
      sleep(1)
      result.append(3)
      expectation3.fulfill()
    }
  }

  let expectation4 = expectation(description: “test4”)
  DispatchQueue.global().asyncAfter(deadline: .now() + 0.4) {
    serialQueue.async {
      sleep(1)
      result.append(4)
      expectation4.fulfill()
    }
  }

  let expectation5 = expectation(description: “test5”)
  DispatchQueue.global().asyncAfter(deadline: .now() + 0.5) {
    serialQueue.sync {
      sleep(1)
      result.append(5)
      expectation5.fulfill()
    }
  }

  wait(for: [expectation1, expectation2, expectation3, expectation4, expectation5], timeout: 10.0)
  XCTAssertEqual([1, 2, 3, 4, 5], result)
}

В этой статье мы реализовали упрощенную версию последовательной очереди, которая широко используется в приложениях iOS, чтобы лучше понять ее функциональность. Важно знать, что исходная реализация очереди использует незаблокированные примитивы, которые могут обеспечить преимущества в производительности. Однако мы решили не использовать их здесь, чтобы не усложнять статью, так как даже с блокировками объяснение было не очень простым. Кроме того, не забудьте не использовать эту реализацию в производственной среде, так как она была создана исключительно в образовательных целях и может быть нестабильной в некоторых крайних случаях по сравнению с версией GCD.

Чтобы увидеть полную реализацию, не стесняйтесь посетить репозиторий GitHub.