В наше время технологии резко продвинулись вперед, одним из наиболее упоминаемых достижений является искусственный интеллект, особенно машинное обучение, которое является подветвью искусственного интеллекта «Машина учится, как ребенок».

Одним из недавно появившихся достижений являются знаменитые Трансформеры. Эти нейронные сети были опубликованы в 2017 году в статье под названием " Внимание — это все, что вам нужно», шутка этого заключается в том, чтобы заменить RNN LSTM, поскольку в краткосрочной перспективе у него происходит сбой памяти, что в больших масштабах влияет на нас.< br /> Трансформеры обращают внимание вместо того, чтобы делать это последовательно, из-за этого у него отсутствует кратковременная память. Если вы хотите узнать, как это работает, я оставляю видео, объясняющее работу этих https://towardsdatascience.com/illustrated-guide-to-transformers-step-by-step-explanation-f74876522bc0. Или вы можете также посмотрите видео

Я искал статью, в которой объяснялось бы, как применить преобразователи nn для создания автоматического распознавания речи, проблема в том, что ни один из этих были в Интернете, информация об этом приложении мало. Поэтому я решил создать и провести исследование самостоятельно.😊

Для выполнения этого проекта мы должны определить нашу цель, которая состоит в том, чтобы воспроизвести звук нашего голоса или голоса другого человека и распечатать текст в консоли, то есть прогноз в тексте.

что-то вроде этого…

где target — это текст данного аудио, а prediction — это текст прогноза или тот, который будет в нашей консоли.

Приступаем к программированию :)

Это шаги, которые мы собираемся выполнить…

  1. Загрузите базу данных или используйте ее в блокнотах Kaggle📖
  2. Обработать данные🧑‍💻
  3. Создайте структуру нейронной сети трансформеров.🏠
  4. Сохраните вес модели👍

В моем случае я собираюсь использовать блокноты Kaggle, потому что база данных, которую мы собираемся использовать, превышает 50 ГБ, поэтому обучение и обработка базы данных уже выполняется в Kaggle. .

Скачать базу данных

Мы собираемся использовать базу данных LibriSpeech. Если вы хотите использовать менее тяжелую базу данных, вы можете использовать LJ-Speech. Вы можете найти LibriSpeech здесь https://www.openslr.org/12/, мы собираемся использовать 360 часов и 100 часов, мы собираемся объединить их, чтобы они были 460 часов аудио.

Проблема с этой базой данных заключается в том, что все аудио находятся в формате .flac, который нам нужен .wav для их чтения и использования в производстве. . Но не волнуйтесь, я уже позаботился о том, чтобы преобразовать их все в wav с помощью созданного мною мини-скрипта. Вы можете найти базу данных в Kaggle, уже конвертированную в wav https://www.kaggle.com/datasets/showmik50/librispeech-asr-wav-dataset

Обработать данные

Сначала нам нужно импортировать зависимости, мы собираемся использовать для этого Keras и Tensorflow, на самом деле, если вы используете Pytorch, это та же теория, что и у Трансформеров.

import os
import random
from glob import glob 
import tensorflow as tf
from tensorflow import keras 
from tensorflow.keras import layers

Мы используем glob для доступа ко всем файлам .wav в адресе папки.

Теперь мы обрабатываем всю базу данных, выполняем цикл for, чтобы получить из нее метаданные и аудио, принадлежащие метаданным. Проще говоря, мы просто создаем словарь или список с адресом файла .wav и транскрипцией аудио себя, что-то вроде этого…

data = {"audio": "./001.wav", "text": "transcription of the audio"}

Вот код…

# Preprocess the dataset
saveto1 = "../input/librispeech-asr-wav-dataset/train-clean-100"
saveto2 = "../input/librispeech-asr-wav-dataset/train-clean-360"
flacs1 = glob("{}/*.wav".format(saveto1), recursive=True)
flacs2 = glob("{}/*.wav".format(saveto2), recursive=True)
metadata1 = glob("../input/librispeech-asr-wav-dataset/meta/train-clean-100/*.txt", recursive=True)
metadata2 = glob("../input/librispeech-asr-wav-dataset/meta/train-clean-360/*.txt", recursive=True)
# train-clean-100
id_to_text1 = {}
for i in metadata1:
    with open(i, encoding="utf-8") as f:
        for line in f:
            id = line.split(" ")[0]
            text_ = line.split(" ")
            text = " ".join(text_[1:])
            text = text.replace("\n", "")
            id_to_text1[id] = text
def get_data1(flacs1, id_to_text1, maxlen=50):
    """ returns mapping of audio paths and transcription texts """
    data1 = []
    for w in flacs1:
        id = w.split("/")[-1].split(".")[0]
        data1.append({"audio": w, "text": id_to_text1[id]})
    return data1
# train-clean-360
id_to_text2 = {}
for i in metadata2:
    with open(i, encoding="utf-8") as f:
        for line in f:
            id = line.split(" ")[0]
            text_ = line.split(" ")
            text = " ".join(text_[1:])
            id_to_text2[id] = text
def get_data2(flacs2, id_to_text2, maxlen=50):
    """ returns mapping of audio paths and transcription texts """
    data2 = []
    for w in flacs2:
        id = w.split("/")[-1].split(".")[0]
        data2.append({"audio": w, "text": id_to_text2[id]})
    return data2
​

Затем мы должны передать его тензору и заставить векторизатор преобразовывать текст в язык, понятный компьютеру также как обратный этому процессу, в конце присоедините словарь к тензору.

max_target_len = 200  # all transcripts in out data are < 200 characters
data1 = get_data1(flacs1, id_to_text1, max_target_len)
data2 = get_data2(flacs2, id_to_text2, max_target_len)
data = data1 + data2
vectorizer = VectorizeChar(max_target_len)
print("vocab size", len(vectorizer.get_vocabulary()))
# datasets of data 
def create_text_ds(data):
    texts = [_["text"] for _ in data]
    text_ds = [vectorizer(t) for t in texts]
    text_ds = tf.data.Dataset.from_tensor_slices(text_ds)
    return text_ds
def path_to_audio(path):
    # spectrogram using stft
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1)
    audio = tf.squeeze(audio, axis=-1)
    stfts = tf.signal.stft(audio, frame_length=200, frame_step=80, fft_length=256)
    x = tf.math.pow(tf.abs(stfts), 0.5)
    # normalisation
    means = tf.math.reduce_mean(x, 1, keepdims=True)
    stddevs = tf.math.reduce_std(x, 1, keepdims=True)
    x = (x - means) / stddevs
    audio_len = tf.shape(x)[0]
    # padding to 10 seconds
    pad_len = 2754
    paddings = tf.constant([[0, pad_len], [0, 0]])
    x = tf.pad(x, paddings, "CONSTANT")[:pad_len, :]
    x = tf.where(tf.math.is_nan(x), 0., x) # get rid of all nan values, avoid "loss:nan"
    return x
def create_audio_ds(data):
    flist = [_["audio"] for _ in data]
    audio_ds = tf.data.Dataset.from_tensor_slices(flist)
    audio_ds = audio_ds.map(
        path_to_audio, num_parallel_calls=tf.data.AUTOTUNE
    )
    return audio_ds
def create_tf_dataset(data, bs=4):
    audio_ds = create_audio_ds(data)
    text_ds = create_text_ds(data)
    ds = tf.data.Dataset.zip((audio_ds, text_ds))
    ds = ds.map(lambda x, y: {"source": x, "target": y})
    ds = ds.batch(bs)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds
split = int(len(data) * 0.99)
train_data = data[:split]
test_data = data[split:]
ds = create_tf_dataset(train_data, bs=64)
val_ds = create_tf_dataset(test_data, bs=4)

Я забыл упомянуть, что функция path_to_audio() используется для преобразования звука в спектрограммы с использованием математического процесса быстрого преобразования Фурье, поскольку в одной части модель использует Сверточные сети для звука, затем модель получает в качестве входных данных аудиоспектрограмму. Спектрограммы выглядят примерно так.

Создадим структуру нейронной сети трансформаторов

мы почти закончили!
Теперь мы собираемся создать структуру трансформеров, если вы не понимаете, как они работают или что я делаю, я рекомендую вам изучить их перед использованием. Мы собираемся создать Пользовательскую модель подкласса.

Мы начинаем кодировать Positional Embedding и Encoder часть модели, а также SpeechFeatureEmbedding для преобразования выходных тензоров позиционное встраивание в спектрограммы с использованием Conv NN.

# Transformer Input layer
class TokenEmbedding(layers.Layer):
    def __init__(self, num_vocab=1000, maxlen=100, num_hid=64):
        super().__init__()
        self.emb = tf.keras.layers.Embedding(num_vocab, num_hid)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)
def call(self, x):
        maxlen = tf.shape(x)[-1]
        x = self.emb(x)
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        return x + positions
class SpeechFeatureEmbedding(layers.Layer):
    def __init__(self, num_hid=64, maxlen=100):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv2 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.conv3 = tf.keras.layers.Conv1D(
            num_hid, 11, strides=2, padding="same", activation="relu"
        )
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)
def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        return self.conv3(x)
# Transformer Encoder Layer
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

Отлично, мы только что создали это…

Теперь мы должны создать часть Decoder, эта часть получает Key и Query, обработанные Encoder, давайте сделаем это.

Это выглядит примерно так…

#Transformer Decoder Layer
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
        self.self_att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.enc_att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.self_dropout = layers.Dropout(0.5)
        self.enc_dropout = layers.Dropout(0.1)
        self.ffn_dropout = layers.Dropout(0.1)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
        """Masks the upper half of the dot product matrix in self attention.
This prevents flow of information from future tokens to current token.
        1's in the lower triangle, counting from the lower right corner.
        """
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        mask = tf.cast(m, dtype)
        mask = tf.reshape(mask, [1, n_dest, n_src])
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
        )
        return tf.tile(mask, mult)
def call(self, enc_out, target):
        input_shape = tf.shape(target)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = self.causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        target_att = self.self_att(target, target, attention_mask=causal_mask)
        target_norm = self.layernorm1(target + self.self_dropout(target_att))
        enc_out = self.enc_att(target_norm, enc_out)
        enc_out_norm = self.layernorm2(self.enc_dropout(enc_out) + target_norm)
        ffn_out = self.ffn(enc_out_norm)
        ffn_out_norm = self.layernorm3(enc_out_norm + self.ffn_dropout(ffn_out))
        return ffn_out_norm

Наконец, мы завершаем модель преобразователя, здесь мы соберем вместе декодер, кодировщик и функцию речи.

# Complete the Transformer model
class Transformer(keras.Model):
    def __init__(
        self,
        num_hid=64,
        num_head=2,
        num_feed_forward=128,
        source_maxlen=100,
        target_maxlen=100,
        num_layers_enc=4,
        num_layers_dec=1,
        num_classes=10,
    ):
        super().__init__()
        self.loss_metric = keras.metrics.Mean(name="loss")
        self.num_layers_enc = num_layers_enc
        self.num_layers_dec = num_layers_dec
        self.target_maxlen = target_maxlen
        self.num_classes = num_classes
self.enc_input = SpeechFeatureEmbedding(num_hid=num_hid, maxlen=source_maxlen)
        self.dec_input = TokenEmbedding(
            num_vocab=num_classes, maxlen=target_maxlen, num_hid=num_hid
        )
self.encoder = keras.Sequential(
            [self.enc_input]
            + [
                TransformerEncoder(num_hid, num_head, num_feed_forward)
                for _ in range(num_layers_enc)
            ]
        )
for i in range(num_layers_dec):
            setattr(
                self,
                f"dec_layer_{i}",
                TransformerDecoder(num_hid, num_head, num_feed_forward),
            )
self.classifier = layers.Dense(num_classes)
def decode(self, enc_out, target):
        y = self.dec_input(target)
        for i in range(self.num_layers_dec):
            y = getattr(self, f"dec_layer_{i}")(enc_out, y)
        return y
def call(self, inputs):
        source = inputs[0]
        target = inputs[1]
        x = self.encoder(source)
        y = self.decode(x, target)
        return self.classifier(y)
@property
    def metrics(self):
        return [self.loss_metric]
def train_step(self, batch):
        """Processes one batch inside model.fit()."""
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        with tf.GradientTape() as tape:
            preds = self([source, dec_input])
            one_hot = tf.one_hot(dec_target, depth=self.num_classes)
            mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
            loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}
def test_step(self, batch):
        source = batch["source"]
        target = batch["target"]
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        preds = self([source, dec_input])
        one_hot = tf.one_hot(dec_target, depth=self.num_classes)
        mask = tf.math.logical_not(tf.math.equal(dec_target, 0))
        loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result()}
def generate(self, source, target_start_token_idx):
        """Performs inference over one batch of inputs using greedy decoding."""
        bs = tf.shape(source)[0]
        enc = self.encoder(source)
        dec_input = tf.ones((bs, 1), dtype=tf.int32) * target_start_token_idx
        dec_logits = []
        for i in range(self.target_maxlen - 1):
            dec_out = self.decode(enc, dec_input)
            logits = self.classifier(dec_out)
            logits = tf.argmax(logits, axis=-1, output_type=tf.int32)
            last_logit = tf.expand_dims(logits[:, -1], axis=-1)
            dec_logits.append(last_logit)
            dec_input = tf.concat([dec_input, last_logit], axis=-1)
        return dec_input

Я знаю, что все это звучит и выглядит очень сложно, но я рекомендую вам глубоко изучить как код, так и трансформеры, чтобы вы знали этот код и не боялись его.

Мы определяем скорость обучения в классе, поскольку трансформеры используют настраиваемую скорость обучения с формулой, мы также собираем все вместе, чтобы можно было обучать нашу модель.

class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(
        self,
        init_lr=0.00001,
        lr_after_warmup=0.001,
        final_lr=0.00001,
        warmup_epochs=15,
        decay_epochs=85,
        steps_per_epoch=203,
    ):
        super().__init__()
        self.init_lr = init_lr
        self.lr_after_warmup = lr_after_warmup
        self.final_lr = final_lr
        self.warmup_epochs = warmup_epochs
        self.decay_epochs = decay_epochs
        self.steps_per_epoch = steps_per_epoch
def calculate_lr(self, epoch):
        """ linear warm up - linear decay """
        warmup_lr = (
            self.init_lr
            + ((self.lr_after_warmup - self.init_lr) / (self.warmup_epochs - 1)) * epoch
        )
        decay_lr = tf.math.maximum(
            self.final_lr,
            self.lr_after_warmup
            - (epoch - self.warmup_epochs)
            * (self.lr_after_warmup - self.final_lr)
            / (self.decay_epochs),
        )
        return tf.math.minimum(warmup_lr, decay_lr)
def __call__(self, step):
        epoch = step // self.steps_per_epoch
        return self.calculate_lr(epoch)
batch = next(iter(val_ds))
model = Transformer(
    num_hid=200, 
    num_head=2, 
    num_feed_forward=400, 
    target_maxlen=max_target_len,
    num_layers_enc=4,
    num_layers_dec=1,
    num_classes=34,
) 
loss_fn = tf.keras.losses.CategoricalCrossentropy(
    from_logits=True, label_smoothing=0.1,
)
learning_rate = CustomSchedule(
    init_lr=0.00001, # 0.00001
    lr_after_warmup=0.001, # 0.001
    final_lr=0.00001, # 0.00001
    warmup_epochs=15,
    decay_epochs=85,
    steps_per_epoch=len(ds),
)
optimizer = keras.optimizers.Adam(learning_rate)

Готово! Теперь мы обучаем нашу модель как минимум 50 или 30 эпохам, и прогнозы начинают выглядеть лучше!

model.compile(optimizer=optimizer, loss=loss_fn)
history = model.fit(ds, validation_data=val_ds, epochs=100)
model.save("./speechrecognitionE.model", include_optimizer=False)

Мы также включаем model.save, потому что хотим сохранить модель, а затем использовать ее без переобучения. Важно то, что мы не можем включить оптимизатор, так как они не разрешены в программах, где они являются Пользовательскими моделями подклассов, а при загрузке модели следует загружать только веса .

Прогнозы!

Теперь самое интересное! Давайте спрогнозируем нашу модель, используя ее вот так…

model_path = "../input/35epochslibrispeech/speechrecognitionE.model"
model.load_weights(model_path)
path = "../input/english-data/LJSpeech-1.1/wavs/LJ001-0001.wav" # the audio
x = path_to_audio(path)
#print(x)
x = tf.expand_dims(x, axis=0)
print(x.shape)
idx_to_char = vectorizer.get_vocabulary()
preds = model.generate(x, 2)
preds = preds.numpy()
bs = tf.shape(x)[0]
for i in range(bs):
    prediction = ""
    for idx in preds[i, :]:
        prediction += idx_to_char[idx]
        if idx == 3:
            break
print("prediction: ", prediction)

Некоторые предсказания

Если у вас возникли проблемы с пониманием этой статьи или проблемы с кодом, отправьте мне электронное письмо или свяжитесь со мной. https://bernardoolisan.com

[email protected]

Надеюсь, вам понравится :)