Что такое сиамская сеть?

Сиамская нейронная сеть (SNN) — это класс архитектуры нейронной сети, который содержит две или более идентичных подсети. «Идентичные» здесь означают, что они имеют одинаковую конфигурацию, параметры и веса. Обновление параметров зеркально отражается в обеих подсетях и используется для поиска сходства между входными данными путем сравнения векторов признаков.

Почему сиамская сеть?

  • Нейронные сети хороши почти во всех задачах, но они полагаются на все больше и больше данных, чтобы работать хорошо. Мы не всегда можем рассчитывать на получение большего количества данных для определенных задач, таких как распознавание лиц и проверка подписи. Для решения подобных задач у нас есть новый тип архитектуры нейронных сетей, который называется сиамскими сетями.
  • Эти сети меньше занимаются распознаванием признаков, но нацелены на поиск сходства между двумя изображениями.
  • Эти типы сетей называются однократными учащимися.

Каковы преимущества его использования?

👉 Они способны изучать общие функции изображений, полезные для прогнозирования неизвестных распределений классов, даже когда доступно очень мало примеров из этих новых распределений.

👉 Они легко обучаются с помощью стандартных методов оптимизации на парах, выбранных из исходных данных.

👉 Они обеспечивают конкурентоспособный подход, который не опирается на предметно-ориентированные знания, а вместо этого использует методы глубокого обучения.

Приступаем к реализации

Вот шаги, которые нам нужно выполнить, чтобы построить модель:

  1. Импорт всех необходимых библиотек
  2. Сбор данных
  3. Подготовка данных
  4. Создание сети
  5. Обучение сети
  6. Создание приложения Kivy с использованием OpenCV и Tensorflow

Импорт всех необходимых библиотек

import cv2
import numpy as np
from matplotlib import pyplot as plt
import random
import os

# Import tensorflow dependencies - Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

# Import uuid library to generate unique image names
import uuid

#Checking for GPU in your device
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

Как работают сиамские нейронные сети?

Мы передаем 2 изображения (якорное и проверочное изображение) и проверяем, похожи ли изображения, сходство вычисляется по слою расстояния, если они очень похожи, выход равен 1, иначе 0.

Сбор данных

Для сбора данных я использовал блокнот Jupyter, вы также можете использовать googlecolab.

Сначала мы настраиваем пути, а затем создаем папку данных, в которой мы настраиваем 3 папки с положительными отрицательными и якорными изображениями.

Папки Positive и Anchor будут содержать изображения одного и того же человека:

# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')
# Make the directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

Для негативных изображений использовался дикий набор данных, который можно скачать по этой ссылке:



# Uncompress Tar GZ Labelled Faces in the Wild Dataset
!tar -xf lfw.tgz
# Move LFW Images to the following repository data/negative
# we form a loop through each folder in the lfw directory 
# since some of the folders have multiple images in them
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)
# Import uuid library to generate unique image names
import uuid
# Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()
   
    # Cut down frame to 250x250px
    frame = frame[120:120+250,200:200+250, :]
    # all color channels from 120 it will start
    
    # Collect anchors 
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Create the unique file path 
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname, frame)
    
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Create the unique file path 
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out positive image
        cv2.imwrite(imgname, frame)
    
    # Show image back to screen
    cv2.imshow('Image Collection', frame)
    
    # Breaking gracefully
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

Нажмите «a» для сохранения изображений в папке привязки, «p» для сбора изображений в папке «postives» и «q» для выхода из камеры.

Предварительная обработка и загрузка

Для этого я загрузил папки в гугл диск и подключил его к колабу

# Connecting with gdrive to access dataset
from google.colab import drive
drive.mount('/content/drive')

Теперь подключаемся к графическому процессору Google и проверяем, подключен ли он

# Accessing google colabs GPU if availabe

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)
gpus

Создание функции для увеличения изображений

def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data

Применение функции увеличения в изображениях Anchor

img_path = os.path.join(ANC_PATH, '924e839c-135f-11ec-b54e-a0cec8d2d278.jpg')
img = cv2.imread(img_path)
augmented_images = data_aug(img)

for image in augmented_images:
    cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

Применение функции увеличения в Positive Images

for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())
# Taking 300 images from the augmented images
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)

Создание функции предварительной обработки для изменения размера и нормализации изображений

def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1 
    img = img / 255.0

    # Return image
    return img
img = preprocess('/content/drive/MyDrive/data/anchor/d451e2a6-b5fd-11ed-b3b9-97e548441b72.jpg')

Сжатие данных в пары (привязка, положительные) и (привязка, отрицательные) пакетов. Наконец, мы объединяем оба пакета в один набор данных.

positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)
#to iterate and 
samples = data.as_numpy_iterator()
exampple = samples.next()
exampple

Обучение и тестирование зданий

Предварительная обработка изображений в новом наборе данных

Создание пакетов данных и разделение на обучающий и тестовый наборы

# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

Структура встраиваемого слоя

def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

embedding = make_embedding()
embedding.summary()

Создание слоя расстояния, чтобы найти разницу между обоими слоями встраивания

# Siamese L1 Distance class

class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)
l1 = L1Dist()

Изготовление сиамской модели

Соединение всех частей для создания модели

def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')
siamese_model = make_siamese_model()
siamese_model.summary()

Обучение

Настройка потерь и оптимизатора

binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

Что такое энтропия/логарифм потерь Binary Cross и почему мы это используем?

Бинарная перекрестная энтропия — это функция потерь, используемая в машинном обучении для измерения разницы между двумя распределениями вероятностей. В частности, он используется для измерения разницы между прогнозируемыми вероятностями и истинными вероятностями задачи бинарной классификации, где на выходе может быть одно из двух возможных значений (например, да или нет, правда или ложь).

Двоичная перекрестная энтропия рассчитывается путем умножения отрицательной суммы истинных вероятностей на логарифм предсказанных вероятностей, плюс истинные вероятности, умноженные на логарифм единицы минус предсказанные вероятности. В результате получается скалярное значение, представляющее разницу между предсказанной и истинной вероятностью.

Одно из преимуществ бинарной перекрестной энтропии заключается в том, что она более сильно наказывает модель за уверенные неверные прогнозы, в отличие от менее уверенных неверных прогнозов. Это связано с тем, что логарифм малой прогнозируемой вероятности (т. е. прогноза с низкой достоверностью) представляет собой большое отрицательное значение, тогда как логарифм большой прогнозируемой вероятности (т. е. прогноз с высокой достоверностью) представляет собой небольшое отрицательное значение.

Другие функции потерь, используемые в машинном обучении, включают среднеквадратичную ошибку, среднюю абсолютную ошибку и категориальную перекрестную энтропию. Среднеквадратическая ошибка и средняя абсолютная ошибка используются для задач регрессии, где выход может быть непрерывным значением, а не двоичным значением. Категориальная перекрестная энтропия используется для задач классификации с несколькими классами, где на выходе может быть одно из нескольких возможных значений.

Итак, бинарная кросс-энтропия — это функция потерь, используемая в задачах бинарной классификации для измерения разницы между предсказанными вероятностями и истинными вероятностями. Она отличается от других функций потерь тем, что более строго наказывает модель за уверенные неверные прогнозы.

Для подробного понимания: https://youtu.be/uIXpftEh5IQ

Что такое оптимизатор? Почему это используется? Почему используется оптимизатор Adam?

В машинном обучении оптимизатор — это алгоритм, который используется для настройки весов и смещений нейронной сети во время обучения, чтобы минимизировать ошибку между прогнозируемыми и истинными выходными данными. Цель оптимизатора — найти значения весов и смещений, дающие наименьшую возможную ошибку.

Оптимизаторы используются потому, что они помогают нейронной сети обучаться быстрее и эффективнее. Они достигают этого путем корректировки весов и смещений таким образом, чтобы свести к минимуму ошибку между предсказанными и истинными выходными данными, что, в свою очередь, помогает нейронной сети делать более точные прогнозы.

Оптимизатор Адама — популярный оптимизатор, используемый в машинном обучении. Это расширение стохастического градиентного спуска (SGD), которое сочетает в себе преимущества двух других оптимизаторов: AdaGrad и RMSProp. Оптимизатор Adam известен своей быстрой сходимостью, хорошей производительностью при решении широкого круга задач и простотой использования. Обычно это хороший выбор для большинства приложений глубокого обучения, особенно когда набор данных большой.

Другие оптимизаторы, обычно используемые в машинном обучении, включают:

1. Стохастический градиентный спуск (SGD)

2. Оптимизатор Адаград

3. Оптимизатор RMSProp

4. Оптимизатор Ададельта

Выбор оптимизатора зависит от конкретной решаемой задачи и характеристик набора данных. В общем, рекомендуется попробовать несколько оптимизаторов и сравнить их производительность на проверочном наборе, чтобы выбрать лучший для данной проблемы.

Определение контрольных точек во время обучения модели

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

Что такое контрольные точки?

Контрольные точки обучения — это моментальные снимки параметров модели машинного обучения в определенный момент времени в процессе обучения. Они представляют собой «контрольную точку» в процессе обучения, позволяя модели возобновить обучение с этой точки, если обучение было прервано, или восстановить параметры модели для оценки или вывода.

Во время обучения параметры модели обновляются итеративно, поскольку алгоритм пытается минимизировать ошибку между прогнозируемым выходом и истинным выходом для заданного набора входных данных. Контрольные точки обучения обычно сохраняются периодически, после определенного количества итераций обучения или через определенное время, чтобы можно было отслеживать прогресс модели и чтобы при необходимости можно было восстановить параметры модели.

Используя обучающие контрольные точки, инженеры по машинному обучению и специалисты по данным могут сэкономить время и вычислительные ресурсы, поскольку им не нужно переобучать модель с нуля, если во время обучения что-то пойдет не так. Они также могут использовать контрольные точки для оценки производительности модели на различных этапах процесса обучения или для точной настройки параметров модели, если это необходимо.

Построение тренировочного шага

test_batch = train_data.as_numpy_iterator()
batch_1 = test_batch.next()
X = batch_1[:2]
y = batch_1[2]

Мы определяем пользовательскую функцию обучения, поскольку наша модель не является последовательной по своей природе.

@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

Для чего используется градиентная лента?

В TensorFlow градиентная лента используется для записи операций, выполняемых на входах функции во время прямого распространения, а затем автоматически вычисляет градиенты функции по отношению к ее входам во время обратного распространения.

Создайте тренировочный цикл

# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

Обучение модели

EPOCHS = 3
train(train_data, EPOCHS)

Делать прогнозы

# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

Сохранение модели

# Save weights
siamese_model.save('siamesemodel.h5')

Создание приложения Kivy

Для этого нам нужно создать 2 файла python:

  • faceid.py
  • Слои.py

faceid.py

# Import kivy dependencies first
from kivy.app import App
from kivy.uix.boxlayout import BoxLayout

# Import kivy UX components
from kivy.uix.image import Image
from kivy.uix.button import Button
from kivy.uix.label import Label

# Import other kivy stuff
# real time feed from kivy app
from kivy.clock import Clock
from kivy.graphics.texture import Texture
from kivy.logger import Logger

# Import other dependencies
import cv2
import tensorflow as tf
from Layers import L1Dist
import os
import numpy as np


# Build app and layout 
class CamApp(App):

    def build(self):
        # Main layout components 
        self.web_cam = Image(size_hint=(1,.8))
        self.button = Button(text="Verify", on_press=self.verify, size_hint=(1,.1))
        self.verification_label = Label(text="Verification Uninitiated", size_hint=(1,.1))

        # Add items to layout
        layout = BoxLayout(orientation='vertical')
        layout.add_widget(self.web_cam)
        layout.add_widget(self.button)
        layout.add_widget(self.verification_label)

        # Load tensorflow/keras model
        self.model = tf.keras.models.load_model('siamesemodel.h5', custom_objects={'L1Dist':L1Dist})

        # Setup video capture device
        self.capture = cv2.VideoCapture(0)
        Clock.schedule_interval(self.update, 1.0/33.0)
        
        return layout

    # Run continuously to get webcam feed
    def update(self, *args):

        # Read frame from opencv
        ret, frame = self.capture.read()
        frame = frame[120:120+250, 200:200+250, :]

        # Flip horizontall and convert image to texture
        buf = cv2.flip(frame, 0).tostring()
        img_texture = Texture.create(size=(frame.shape[1], frame.shape[0]), colorfmt='bgr')
        img_texture.blit_buffer(buf, colorfmt='bgr', bufferfmt='ubyte')
        self.web_cam.texture = img_texture

    # Load image from file and conver to 100x100px
    def preprocess(self, file_path):
        # Read in image from file path
        byte_img = tf.io.read_file(file_path)
        # Load in the image 
        img = tf.io.decode_jpeg(byte_img)
        
        # Preprocessing steps - resizing the image to be 100x100x3
        img = tf.image.resize(img, (100,100))
        # Scale image to be between 0 and 1 
        img = img / 255.0
        
        # Return image
        return img

    # Verification function to verify person
    def verify(self, *args):
        # Specify thresholds
        detection_threshold = 0.5
        verification_threshold = 0.5

        # Capture input image from our webcam
        SAVE_PATH = os.path.join('application_data', 'input_image', 'input_image.jpg')
        ret, frame = self.capture.read()
        frame = frame[120:120+250, 200:200+250, :]
        cv2.imwrite(SAVE_PATH, frame)

        # Build results array
        results = []
        for image in os.listdir(os.path.join('application_data', 'verification_images')):
            input_img = self.preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
            validation_img = self.preprocess(os.path.join('application_data', 'verification_images', image))
            
            # Make Predictions 
            result = self.model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
            results.append(result)
        
        # Detection Threshold: Metric above which a prediciton is considered positive 
        detection = np.sum(np.array(results) > detection_threshold)
        
        # Verification Threshold: Proportion of positive predictions / total positive samples 
        verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
        verified = verification > verification_threshold

        # Set verification text 
        self.verification_label.text = 'Verified' if verified == True else 'Unverified'

        # Log out details
        Logger.info(results)
        Logger.info(detection)
        Logger.info(verification)
        Logger.info(verified)

        
        return results, verified


if __name__ == '__main__':
    CamApp().run()

Layers.py

# WHY DO WE NEED THIS: its needed to load the custom model

# Import dependencies
import tensorflow as tf
from tensorflow.keras.layers import Layer

# Custom L1 Distance Layer from Jupyter 
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

Заключение

Вот как выглядит наше финальное приложение:

Проблемы, с которыми столкнулись

  • Переобучение модели
  • Разнообразие данных

Заключение

Одноразовое обучение имеет возможность точно классифицировать или распознавать новые объекты или шаблоны на основе всего одного или нескольких примеров этого объекта или шаблона.

Это приложение обучается только на изображениях одного человека и занимает сравнительно меньше времени, а также требует меньше данных, чем другие методы проверки, поэтому одним из возможных его применений является блокировка проверки лица на мобильных устройствах.

Рекомендации

Исследовательская работа:



YouTube видео:

https://www.youtube.com/playlist?list=PLgNJO2hghbmhHuhURAGbe6KWpiYZt0AMH