Задача

При обучении моделей обычно данные требуют подготовки и обработки перед их подачей в качестве входных данных для модели. Некоторые из этих операций просты и требуют очень мало процессорного времени, в то время как другие могут использовать значительные ресурсы ЦП.

В этом посте описывается, как использовать ZeroMQ для создания горизонтально масштабируемой среды генерации данных для обучения моделей Keras. Это позволяет выполнять более ресурсоемкие операции на любом количестве серверов. При достаточной пропускной способности сети вы можете поддерживать свой графический процессор большим количеством потоковых данных.

Хотя это выходит за рамки этого документа, вы можете создавать более продвинутые архитектуры обучения и даже изменять источники и функции обучающих данных в зависимости от состояния обучения, связывая стандартные обратные вызовы Keras с генераторами данных.

Полный источник, использованный для этого поста, находится на GitHub: https://github.com/albertoa/keras_zmq_scaling_data_generators.

Шаблон связи

ZeroMQ — это распределенная библиотека обмена сообщениями, которая реализует несколько шаблонов. Он небольшой, высокопроизводительный и обеспечивает привязки примерно к 50 языкам.

Для этого простого случая мы будем использовать нескоординированную PUSH/PULL архитектуру. Рабочие генератора данных не скоординированы, потому что они предполагают, что они должны продолжать генерировать данные с момента их запуска до тех пор, пока они работают. Мы используем функцию «высокой отметки», чтобы заблокировать воркеров, как только они поставят в очередь достаточно данных. Когда приемник потребляет данные из сокета PULL, он освобождает место, и рабочие процессы автоматически начинают ставить в очередь больше данных.

Мы будем называть машину, на которой выполняется обучение Keras, приемником, открывающим сокет типа ZeroMQ PULL. Все сгенерированные данные попадут в сокет PULL приемника. Вот как мы открываем сокет PULL (полный листинг кода будет показан ниже).

# Get a ZMQ context
context = zmq.Context()
# Socket for getting the training data
train_socket = context.socket(zmq.PULL)
# Set the high water mark to 2 * batch_size
train_socket.set_hwm(2 * params["batch_size"])
train_socket.bind("tcp://*:"+str(params["train_zmq_port"]))

Один или несколько процессов генератора данных, называемых рабочими, будут открывать сокеты типа ZeroMQ PUSH и подключаться к сокету ZeroMQ PULL. Эти рабочие процессы могут выполняться на том же компьютере, что и приемник, или на удаленных серверах. zmq_dst — это URL-адрес сокета PULL, вы можете увидеть, как он устроен, в полном листинге ниже.

# Create the ZeroMQ context
zmq_context = zmq.Context()
# Socket to send messages
zmq_send_socket = zmq_context.socket(zmq.PUSH)
# Change the high water mark to twice the batch_size (must be
# done prior to the connect)
zmq_send_socket.set_hwm(2 * params["batch_size"])
# Connect to the sink
zmq_send_socket.connect(zmq_dst)

Порядок запуска не имеет значения

В показанной архитектуре порядок запуска приложений не имеет значения. Вы можете сначала запустить обучение Keras (приемник), а затем группу генераторов данных (воркеры) или наоборот. Вы также можете запускать или останавливать воркеры в любое время, поэтому, если вы понимаете, что вам нужно больше воркеров, чтобы насытить сетевое соединение, продолжайте и запускайте новых. Когда модель закончит обучение, вам не нужно беспокоиться о рабочих. Они будут продолжать ставить данные в очередь до тех пор, пока не будет достигнута их максимальная отметка, а затем блокироваться до тех пор, пока не будет запущен новый приемник (обучение другой модели и т. д.). Основная забота о том, как распределяются работники, состоит в том, чтобы гарантировать, что данные отправляются достаточно случайным образом для правильного обучения модели. Другими словами, если все рабочие процессы генерируют одни и те же данные в одном и том же порядке и отправляют их в одно и то же время, обучающий пакет будет содержать очень маленькое подмножество данных, многократно скопированных, что нежелательно.

Рабочие генераторы данных

Рабочие имеют все данные изображения в подкаталогах, которые соответствуют их классу. Вот структура каталогов:

$ ls -1 nvmelinks/val_images/
HTC-1-M7
iPhone-4s
iPhone-6
LG-Nexus-5x
Motorola-Droid-Maxx
Motorola-Nexus-6
Motorola-X
Samsung-Galaxy-Note3
Samsung-Galaxy-S4
Sony-NEX-7

Внутри каждого подкаталога вы можете найти настоящие изображения, по 275 на класс.

$ ls -1 nvmelinks/train/HTC-1-M7/
(HTC-1-M7)100.jpg
(HTC-1-M7)101.jpg
(HTC-1-M7)102.jpg
(HTC-1-M7)103.jpg
(HTC-1-M7)104.jpg
(HTC-1-M7)105.jpg
(HTC-1-M7)106.jpg
[...]

В целях тестирования я также включил программу тестового приемника, которая просто отображает патчи изображения. Для этого теста я позволяю рабочим заполнить свои очереди до запуска тестового приемника. Затем я запускаю тестовый приемник, настроенный на 3 пакета по 50 исправлений изображений. Когда рабочие могут не отставать от раковины, мы можем увидеть довольно быстрые тайминги. Вы можете посмотреть в репозитории testsink_zmq.py

$ python testsink_zmq.py 
Get batch took: 19 msec
Get batch took: 4 msec
Get batch took: 4 msec

Вот пример того, что тестовый приемник показывает на экране:

Давайте посмотрим на полный исходный код для генераторов данных (для тех, у кого есть проблемы с сутью IOS, вы можете увидеть код непосредственно в репозитории: generator_train_zmq.py)

Строки 5–20 выполняют типичный импорт. Строка 23 импортирует базовый файл конфигурации, который устанавливает такие вещи, как расположение наборов данных, метки классов, коды манипуляций и т. д. Полный исходный код файла конфигурации можно найти в репозитории.

Строки 24–84 принимают параметры, файлы конфигурации и аргументы командной строки и завершают конфигурацию, используемую для запуска программы.

В строках 85–95 создается сокет ZeroMQ, который подключается к сокету PULL удаленного приемника.

В строке 96 выделяется буфер. В этом буфере будет находиться каждый патч изображения с соответствующим идентификатором класса и кодом манипуляции. См. строки 106, 112 и 177 о том, как мы заполняем буфер.

Строки 114–175 отвечают за загрузку изображений, любые манипуляции и выбор окончательного патча.

Осталось отправить буфер сообщений, что делается в строке 180.

Обучение модели Keras

Давайте посмотрим на его аналог. Учебная раковина Keras. В этом случае мы используем MobileNet. Файл также можно найти в репозитории train_mobilenet_zmq.py.

Выделим важные разделы.

В строках 83–125 определена функция генератора обучающих данных train_gen(). Как видите, буфер сообщений берется в строке 99, и после того, как я удостоверился, что он имеет правильную длину (строки 102–104), я беру данные и помещаю их в предварительно выделенные массивы. Как и все генераторы, используемые в model.fit_generator, функция выдает результаты (строка 125).

Строки 127–143 являются эквивалентным генератором данных проверки. Я решил оставить это как генератор, который подбирает патчи изображений из файлов базы данных HDF5 (которые открываются в строках 247–249). Хотя это выходит за рамки данного документа, за ним должно быть легко следовать. Вы можете посмотреть работу, выполняемую вспомогательной функцией hf.hdf5_get_rand_set, посмотрев файл библиотеки: lib/helperfunctions.py

В строках 237–244 я открываю сокет ZeroMQ PULL, который принимает все данные.

Остальная часть кода представляет собой типичное обучение модели Keras для MobileNet с использованием весов ImageNet и нового верхнего раздела для размещения количества классов, соответствующих нашим потребностям.

И это все?

Я надеюсь, что это краткое введение дало вам представление о том, как создать надежную, хорошо масштабируемую архитектуру. Но это должно быть только начало, к этому простому случаю можно добавить более продвинутые функции для создания адаптивного обучения. Чтобы узнать о других коммуникационных шаблонах помимо PUSH/PULL, показанных здесь, или чтобы узнать, на что способен ZeroMQ, обязательно прочтите ZeroMQ — руководство.