Пользовательский генератор данных Tensorflow2.x с многопроцессорной обработкой

Я только что обновился до tenorflow 2.3. Я хочу сделать собственный генератор данных для обучения. С tenorflow 1.x я сделал следующее:

def get_data_generator(test_flag):
  item_list = load_item_list(test_flag)
  print('data loaded')
  while True:
    X = []
    Y = []
    for _ in range(BATCH_SIZE):
      x, y = get_random_augmented_sample(item_list)
      X.append(x)
      Y.append(y)
    yield np.asarray(X), np.asarray(Y)

data_generator_train = get_data_generator(False)
data_generator_test = get_data_generator(True)
model.fit_generator(data_generator_train, validation_data=data_generator_test, 
                    epochs=10000, verbose=2,
                    use_multiprocessing=True,
                    workers=8,
                    validation_steps=100,
                    steps_per_epoch=500,
                    )

Этот код отлично работал с tensorflow 1.x. В системе создано 8 процессов. Процессор и видеокарта загрузились отлично. загруженные данные были напечатаны 8 раз.

С tenorflow 2.3 я получил предупреждение:

ПРЕДУПРЕЖДЕНИЕ: tensorflow: многопроцессорность может плохо взаимодействовать с TensorFlow, вызывая недетерминированные взаимоблокировки. Для высокопроизводительных конвейеров данных рекомендуется tf.data.

загруженные данные были напечатаны один раз (должно 8 раз). GPU загружен не полностью. У него также есть утечка памяти каждую эпоху, поэтому обучение будет остановлено через несколько эпох. Флаг use_multiprocessing не помог.

Как сделать генератор / итератор в тензорном потоке (keras) 2.x, который можно легко распараллелить между несколькими процессами ЦП? Тупики и порядок данных не важны.


person user1941407    schedule 14.10.2020    source источник
comment
Вы согласны с использованием tf.data конвейера? Как сказано в предупреждении, это можно распараллелить.   -  person jakub    schedule 17.10.2020
comment
Я пробовал использовать tf.keras.utils.Sequence. Этот класс работает и при необходимости распараллеливается. Проблема в том, что у меня много кода для tenorflow 1 с использованием стандартного генератора Python. Я хочу, чтобы этот код продолжал работать с tenorflow 2 без необходимости переписывать его.   -  person user1941407    schedule 19.10.2020
comment
Вы изучали tf.data.Dataset.from_generator? Вы, вероятно, сможете сделать tf.data.Dataset из существующего генератора. Я думаю, что генератор должен выдавать один образец на итерацию, а затем вы можете выполнить пакетную обработку с помощью dataset.batch(BATCH_SIZE).   -  person jakub    schedule 19.10.2020


Ответы (1)


В tf.data конвейере есть несколько мест, где вы можете распараллелить. В зависимости от того, как ваши данные хранятся и читаются, вы можете распараллелить чтение. Вы также можете распараллелить расширение и предварительно получить данные во время обучения, чтобы ваш графический процессор (или другое оборудование) никогда не нуждался в данных.

В приведенном ниже коде я продемонстрировал, как можно распараллелить расширение и добавить предварительную выборку.

import numpy as np
import tensorflow as tf

x_shape = (32, 32, 3)
y_shape = ()  # A single item (not array).
classes = 10

# This is tf.data.experimental.AUTOTUNE in older tensorflow.
AUTOTUNE = tf.data.AUTOTUNE

def generator_fn(n_samples):
    """Return a function that takes no arguments and returns a generator."""
    def generator():
        for i in range(n_samples):
            # Synthesize an image and a class label.
            x = np.random.random_sample(x_shape).astype(np.float32)
            y = np.random.randint(0, classes, size=y_shape, dtype=np.int32)
            yield x, y
    return generator

def augment(x, y):
    return x * tf.random.normal(shape=x_shape), y

samples = 10
batch_size = 5
epochs = 2

# Create dataset.
gen = generator_fn(n_samples=samples)
dataset = tf.data.Dataset.from_generator(
    generator=gen, 
    output_types=(np.float32, np.int32), 
    output_shapes=(x_shape, y_shape)
)
# Parallelize the augmentation.
dataset = dataset.map(
    augment, 
    num_parallel_calls=AUTOTUNE,
    # Order does not matter.
    deterministic=False
)
dataset = dataset.batch(batch_size, drop_remainder=True)
# Prefetch some batches.
dataset = dataset.prefetch(AUTOTUNE)

# Prepare model.
model = tf.keras.applications.VGG16(weights=None, input_shape=x_shape, classes=classes)
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy")

# Train. Do not specify batch size because the dataset takes care of that.
model.fit(dataset, epochs=epochs)
person jakub    schedule 18.10.2020
comment
Можно ли использовать генератор tf.keras.utils.Sequence в tf.data.Dataset.from_generator? - person M.Innat; 06.03.2021
comment
@ M.Innat - Я уверен, что это возможно. Не стесняйтесь открывать новый вопрос. Если вы разместите ссылку здесь, я буду рад взглянуть. - person jakub; 06.03.2021
comment
Я видел, как с тем подходом, который вы показали, другие поступали так же. Но я думаю, что с генератором классов Sequence возникает мало проблем. Загляните сюда. - person M.Innat; 08.03.2021