Создание библиотеки фьючерсов с помощью каналов, горутин и выборок

Опубликовать 4 из серии на ходу

В предыдущих сообщениях блога мы рассмотрели, как использовать каналы для создания неограниченных очередей, пулов и управления несколькими параллельными запросами. Мы можем делать все это с помощью каналов, и в этих случаях абстракция имела смысл, но иногда мы хотим по-другому сформулировать проблемы параллелизма. Например, разработчики Node.js используют концепцию фьючерсов (иногда называемых обещаниями) для организации обратных вызовов и фоновых задач. Как мы можем построить фьючерсы на Go, чтобы добиться того же рабочего процесса?

Если вы не знакомы с фьючерсами, вот основная идея. Они представляют собой программную абстракцию, которая позволяет вам запускать небольшой фрагмент кода в фоновом режиме и немедленно возвращать ссылку на результат. Когда вы в конце концов попытаетесь прочитать результат по ссылке, future проверяет, был ли результат рассчитан. Если да, то немедленно возвращается. Если нет, вы ждете, пока не будет заполнен результат.

Фьючерсы обладают двумя важными свойствами. Во-первых, вам не нужно ждать результата фонового процесса, пока он вам действительно не понадобится. Во-вторых, использование фьючерсов скрывает много беспорядочного шаблона, который часто используется в параллельном программировании, что значительно упрощает отслеживание исходного кода.

Разработайте API

Итак, давайте разберемся, как будет выглядеть API для базового будущего. Мы хотим:

  • передать код для запуска
  • вернуть ссылку, которая в конечном итоге будет содержать значение, вычисленное кодом

Давайте сначала посмотрим на возвращаемый тип. Поскольку это Go, у нас есть пара специфических для языка деталей, на которые нужно обратить внимание.

Во-первых, опять же, в Go нет универсальных шаблонов, поэтому нам нужно писать в самых общих терминах, а наш клиентский код транслирует ответы на нужные типы данных. Это означает использование interface{} в качестве возвращаемого типа данных.

Во-вторых, идиоматический способ обработки ошибок в Go - это возвращение error в качестве последнего параметра, поэтому нам нужно также вернуть error на случай, если наши будущие ошибки исчезнут.

Мы не можем просто вернуть struct с заполненными значениями, потому что мы немедленно возвращаем ссылку и заполняем данные позже. Это означает, что нам нужно вызвать метод, который потенциально может ждать появления данных. Вместо того, чтобы использовать конкретный тип, раскрытие interface имеет наибольший смысл и дает нам максимальную гибкость. Итак, наш возвращаемый тип будет выглядеть так:

А что насчет функции, которую мы передаем? Поскольку в Go есть замыкания, мы можем использовать их для обертывания кода, который мы действительно хотим запустить. Это означает, что нам не нужно беспокоиться о передаче параметров нашей функции; мы захватим их из окружающей среды. Нам действительно нужно возвращать значения из нашего закрытия, чтобы мы могли заполнить нашу future.Interface реализацию, поэтому наша функция должна возвращать (interface{}, error).

Теперь, когда мы определили типы ввода и вывода, вот как выглядит первый проход нашего API:

Скоро заполним реализацию. Вот как выглядит клиентский код при создании будущего:

Первая реализация

Теперь, когда мы знаем, как код должен выглядеть для внешнего мира, мы можем начать думать о том, как он будет делать то, что ему нужно. Посмотрим, сможем ли мы реализовать наше будущее с помощью только стандартных механизмов параллелизма Go: select, каналов и горутин.

Во-первых, нам нужна структура для хранения данных и реализации интерфейса. Мы не хотим раскрывать его нашим клиентам, поскольку они должны использовать только интерфейс и фабричную функцию. Это означает, что первый символ имени должен содержать строчную букву.

Вы можете быть удивлены тем, что готовый канал типа struct{}. Мы собираемся воспользоваться шаблоном в Go, который сигнализирует о завершении работы. Когда канал не буферизован, попытка чтения из него приостановит выполнение горутины чтения до тех пор, пока не произойдет одно из двух: значение будет записано в канал или канал будет закрыт. В наш done канал никогда не будет записано значение; он существует только для того, чтобы быть закрытым, когда работа будет завершена. Это позволяет вам использовать закрытый канал как постоянный сигнал от одной горутины к другой, что можно продолжать.

Теперь нам нужно реализовать New:

В нашем коде New создает futureImpl, а затем запускает горутину, которая запускает входящую функцию, присваивает значение и ошибку, возвращаемую функцией, полям в нашем futureImpl и закрывает канал done. Между тем, метод Get ожидает закрытия канала done, а затем возвращает значение и ошибку. Если вы снова вызовете Get, закрытый канал done немедленно вернется, и мы вернем значение и ошибку без ожидания.

У нас есть базовая структура будущего, работающая примерно в 20 строках кода; совсем не так уж плохо.

Если вы хотите увидеть, как это работает, посмотрите эту ссылку на Go Playground.

Не жди вечно

Теперь, когда основы сделаны, пришло время добавить еще несколько функций в наше будущее. Первое, что может прийти в голову, это то, что было бы хорошо не ждать вечно, чтобы завершить будущее. В конце концов, будущее может длиться дольше, чем мы хотим ждать. Мы должны добавить второй метод в наш интерфейс, чтобы мы могли ограничить время ожидания:

Вызов GetUntil вместо Get будет ждать только указанного Duration. Если результат возвращается до истечения времени ожидания, мы хотим вернуться в этот момент. Если результат вернется после этого момента, мы хотим перестать ждать. Чтобы узнать, истекло ли время ожидания запроса, мы возвращаем логический флаг в дополнение к значению и ошибке. Если флаг установлен в значение true, то время ожидания запроса истекло. Если оно ложно, то возвращаемое значение и ошибка были значением и ошибкой, вычисленными нашей функцией.

Для поддержки GetUntil нам не нужно изменять New или Get. Нам просто нужно добавить следующую реализацию метода:

Оператор select ожидает как на done канале в f, так и на канале, возвращаемом time.After. Какой результат вернется первым, определяет, какой результат будет возвращен.

Также обратите внимание, что если GetUntil истекло время, значит, вы не потеряли свою работу. Вы можете позвонить Get или GetUntil еще раз и получить результат будущего. Все, что делает GetUntil, - это не дает вам дождаться расчета результата.

Вот пример кода клиента, использующего GetUntil:

Если вы хотите увидеть, как это работает, посмотрите эту ссылку на Go Playground.

Постройте цепочку

Отлично, наша будущая реализация продвигается. Добавим еще кое-что.

Еще одна особенность, которую обычно имеют фьючерсы, - это возможность объединяться в цепочку. Если у вас есть несколько длительных процессов, которые зависят друг от друга, один за другим, было бы неплохо, если бы future автоматически перешел от одного к другому. Если один вызов в цепочке дает сбой, мы хотим остановить обработку работы в цепочке и вернуть ошибку, вызвавшую сбой. Это означает, что в наш интерфейс необходимо добавить новые методы:

Наш новый метод называется Then. Он принимает функцию и возвращает новый будущий интерфейс. Функция, переданная в Then, немного отличается от функции, переданной в New. У этого есть единственный входной параметр, который является результатом предыдущего шага в цепочке, отсюда и название.

Еще раз, мы можем добавить новую реализацию метода без изменения какого-либо существующего кода. Мы собираемся вызвать существующую New функцию, чтобы создать новое будущее. Поскольку функция New ожидает функцию без входных параметров, мы собираемся передать ей замыкание, которое выполняет три функции:

1. Вызовите метод Get для предыдущего будущего в цепочке.

2. Проверьте, не возвращало ли предыдущее будущее ошибку. Если это так, верните его.

3. В противном случае вызовите следующую функцию с результатом из предыдущего будущего.

Добавить Then шаг в будущее довольно просто:

С этим дополнением у нас есть довольно полная будущая реализация. Мы можем запускать задания в фоновом режиме, выбирать, как долго ждать их завершения, и объединять задания в цепочку с очень кратким синтаксисом. Но добавление будущей цепочки означает, что было бы неплохо иметь еще одну вещь: отмена.

Вы можете увидеть этот пример кода, выполняющийся по этой ссылке на Go Playground.

Сразись с будущим

Разница между отменой и тайм-аутом тонкая, но важная. Тайм-аут означает, что я не хочу прямо сейчас ждать завершения работы, но работа продолжается в фоновом режиме, на случай, если вы захотите вернуться за результатами позже.

Отмена означает, что меня не волнуют результаты, и если есть какие-либо дополнительные шаги после завершения текущего, не запускайте их.

В отличие от других наших дополнений, добавление поддержки отмены требует изменения существующего кода. Это имеет смысл; мы добавляем способ прервать то, что раньше нельзя было прерывать. К счастью, функции параллелизма в Go позволяют легко добавить эту поддержку.

Во-первых, нам нужно добавить возможность отмены в наш будущий интерфейс:

Следует четко понимать, что Cancel не может остановить выполнение текущей функции; как только функция начинает выполняться, вы ничего не можете сделать извне, чтобы остановить ее. Все, что мы можем сделать, - это немедленно перестать ждать результатов и предотвратить запуск каких-либо дополнительных элементов в цепочке Then.

Затем мы собираемся изменить futureImpl и добавить наши реализации Cancel и IsCancelled:

Нам нужно добавить поле в futureImpl, чтобы мы могли отслеживать статус отмены. Реализации Cancel и IsCancelled просты. В Cancel мы закрываем cancel канал только в том случае, если он еще не закрыт и если done не был закрыт первым. Оба метода используют предложение default в select, чтобы гарантировать немедленный ответ.

Вы могли заметить, что в futureImpl добавлено еще одно новое поле. Одно из правил Go для каналов - закрытие канала более одного раза вызывает панику. Поскольку паника убьет работающее приложение, нам нужно убедиться, что наш канал cancel закрывается только один раз. Если несколько горутин одновременно вызывают Cancel в одном и том же будущем, возможно, что несколько горутин вернутся к регистру по умолчанию в выбранном.

Стандартная библиотека Go предлагает решение этой проблемы. Он включает тип под названием sync.Once. Заключение закрытия канала отмены в закрытие, вызываемое методом sync.Once.Do, гарантирует, что только одна горутина фактически закроет канал, независимо от того, сколько горутин попытаются сделать это одновременно.

Чтобы заполнить и использовать канал cancel, нам нужно изменить New, Then, Get и GetUntil:

Изменения в Get и GetUntil просто добавляют проверку для cancel канала. Изменения в New немного сложнее. Мы хотим отменить один раз и повлиять на всю будущую цепочку. Самый простой способ сделать это - чтобы все фьючерсы в цепочке слушали один и тот же канал отмены. У начальной записи в цепочке не будет канала отмены, поэтому нам нужно его создать. Все последующие Steps, добавленные в Then, будут использовать канал отмены, созданный для первого будущего.

Совместное использование ссылки на один и тот же канал передает состояние отмененного канала на все этапы будущей цепочки. Поскольку закрытый канал всегда немедленно возвращается, все горутины могут считывать значение без конфликтов.

Вот пример кода, демонстрирующего отмену:

Вы можете запустить этот код по этой ссылке на Go Playground.

Поместите будущее в контекст

Что, если мы хотим, чтобы что-то помимо нашей непосредственной функции инициировало отмену? Возможно, у нас есть запрос, который должен выполняться только в течение определенного времени. В этом случае мы должны использовать Context. Go 1.7 добавил Context в стандартную библиотеку в версии 1.7. Но что это?

Контекст - это решение двух частично совпадающих проблем в Go:

  1. Как мне поддерживать локальные переменные горутины?

2. Как мне подать сигнал горутине, что она должна перестать работать?

Контекст - это интерфейс, который обеспечивает доступ к:

  • Пары ключ / значение, которые передаются с более ранних этапов запроса на более поздние этапы (например, получение пользователя в обработчике аутентификации и его использование как часть запроса к базе данных).
  • Готовый канал, который закрывается, когда приходит время прекратить работу над текущим запросом или когда контекст был явно отменен.
  • Крайний срок, который сообщает вам, когда закрывается завершенный канал.
  • Ошибка, которая сообщает вам, почему был закрыт канал done. Пакет context содержит функции для создания контекстов с крайними сроками и значениями, а также для получения доступа к функции отмены для контекста.

Мы можем воспользоваться поддержкой отмены контекста, чтобы создать запланированные отмены для наших фьючерсов. Все, что нужно, - это добавить новую фабричную функцию для фьючерсов NewWithContext:

Если нам предоставлен Context с установленным каналом отмены, нам нужно настроить горутину, которая ждет, чтобы увидеть, отменен ли контекст, прежде чем будущее завершит свою работу. Если это так, мы инициируем отмену в будущем.

Код для его использования выглядит так:

Вы можете запустить этот код по этой ссылке на Go Playground.

Будущие расширения

Есть дополнительные функции, которые будут найдены в других будущих реализациях. Есть много вариантов, но некоторые из них включают:

  • Получение значения и ошибки и немедленный возврат независимо от того, выполнено это или нет.
  • Этапы выполняются только при возникновении ошибки.
  • Указание нескольких функций для будущего запуска и ожидание завершения любой из них.
  • Указание нескольких функций для будущего запуска и ожидание, пока все они не будут выполнены.
  • Объединение значений из нескольких фьючерсов в одно будущее.

Добавление этих функций выходит за рамки этого поста, но все они могут быть созданы с использованием строительных блоков, которые я уже описал. Каналы, горутины и выборки позволяют нам создавать эти сложные структуры, используя простой код, который намного легче понять, чем обычный набор потоков и мьютексов, предоставляемый большинством других языков.

Если кто-то заинтересован в использовании этой будущей библиотеки, Capital One имеет открытый исходный код более продвинутой версии, которую можно найти по адресу https://github.com/capitalone/go-future-context. Не стесняйтесь разветвлять его и отправлять запросы на включение с дополнительными функциями и исправлениями ошибок.

ЗАЯВЛЕНИЕ О РАСКРЫТИИ ИНФОРМАЦИИ: это мнение автора. Если в этом посте не указано иное, Capital One не является аффилированным лицом и не поддерживается ни одной из упомянутых компаний. Все используемые или отображаемые товарные знаки и другая интеллектуальная собственность являются собственностью их соответствующих владельцев. Эта статья принадлежит © Capital One, 2017 г.

Дополнительные ссылки