Mutex, сокращение от взаимного исключения, - это инструмент для управления параллелизмом в программном обеспечении. Бывают случаи, когда вам нужно выполнить последовательность задач в программном приложении, и после того, как одна из них запустила последовательность, вы не хотите, чтобы другая запускалась, пока первая не будет завершена.

Возьмем, к примеру, задачу публикации мультимедиа, когда пользователь загружает мультимедийный контент (изображение / аудио / видео). Сервер должен обработать этот носитель перед фактической публикацией, возможно, для уменьшения размера и выполнения проверок соответствия. Пользователю не должно быть разрешено публиковать другой артефакт, пока первый находится в обработке. Последовательность задач может быть такой:

  1. Потоковая передача мультимедиа и запись в хранилище. Назовем это артефактом.
  2. Запустите службу изменения размера артефакта.
  3. Отправьте артефакт на комплаенс-задания, дождитесь их ответа.
  4. Опубликуйте артефакт.

В зависимости от программной среды может быть несколько простых решений. Например, если сервер представляет собой монолит с одним узлом и одним процессом, беспокоиться не о чем. Запросы будут поставлены в очередь и в любом случае будут обрабатываться по одному. Если среда бессерверная и основана на микросервисах, все будет немного интереснее. Еще один нюанс - блокировка распространяется только на конкретного пользователя. Служба, вероятно, должна разрешить другим пользователям публиковать, пока идет обработка для несвязанных пользователей.

Первый инстинкт - создать замок. Как только запрос на публикацию получен, пользователь заблокирован от публикации чего-либо. После завершения задачи блокировка будет снята.

def publish(user_id: int, stream: StreamIO):
    if not is_locked(user_id):
        lock(user_id)
        #do the tasks
        unlock(user_id)

Суть решения такова. Однако есть проблема в том, что два запроса «одновременно» (могут быть на двух разных узлах / экземплярах) вошли в блок if, и теперь оба они думают, что заблокировали пользователя. Идея состоит в том, чтобы использовать атомарную операцию для is_locked () и lock (). Что-то вроде этого:

def publish(user_id: int, stream: StreamIO):
    lck = check_and_lock(userId)
    if not lock:
        return 'Already locked!'
    #do the tasks
    unlock(userId)

MongoDB обеспечивает атомарность на уровне документа. Если для каждого пользователя существует один документ, функция блокировки может быть реализована следующим образом:

class Lock:
    def __init__(mongo_client):
        self.collection = mongo_client[_db_name][_collection_name]
    def _toggle(self, resource_id: str, value: bool) -> bool:
        res = self.collection.update_one(
            {'_id': resource_id, 'locked': not value},
            {'$set': {'locked': value}},
        )
        if res.modified_count == 0:
            return False
        if res.modified_count == 1:
            return True
        raise Exception('Multiple documents found!')
    def lock(self, resource_id: str) -> bool:
        return self._toggle(resource_id, True)
    def unlock(self, resource_id: str) -> bool:
        return self._toggle(resource_id, False)
...
def publish(user_id: int, stream: StreamIO):
    lock = Lock(mongo_client)
    is_locked = Lock.lock(str(user_id))
    if not is_locked:
        return 'Already locked!'
    #do the tasks
    lock.release(str(user_id))
    return 'Success!'

Мы почти на месте. Бывают случаи, когда пользователи впервые публикуют документы, у которых нет документа в коллекции MongoDB. Чтобы избежать сбоев, MongoDB имеет функцию upsert, которая придет на помощь:

class Lock:
    def __init__(mongo_client):
        self.collection = mongo_client[_db_name][_collection_name]
    def lock(self, resource_id: str) -> bool:
        try:
            res = self.collection.update_one(
                {'_id': resource_id, 'locked': False},
                {'$set': {'locked': True}},
                upsert=True
            )
            assert res.modified_count == 1 or res.raw_result['n'] == 1
            return True
        except pymongo.errors.DuplicateKeyError:
            print(f'Lock exists on resource_id: {resource_id}')
            return False
    def unlock(self, resource_id: str) -> bool:
        res = self.collection.update_one(
            {'_id': resource_id, 'locked': True},
            {'$set': {'locked': False}}
        )
        return res.modified_count == 1

Здесь на картинке появляется try-except, потому что upsert будет проверять документ с запросом

{‘_id’: user_id, 'locked': False}

Этот запрос может вернуть НУЛЬ результатов при двух условиях:

  1. Нет документов для _id = user_id
  2. Нет документа для _id = userId И заблокировано = false

Первый - это наш новый угловой случай пользователя, где нам нужно просто создать новый документ. Во втором случае user_id уже заблокирован. В последнем случае запрос update_one + upsert попытается создать документ, но вызовет исключение дублирования ключа, поскольку он не может создать второй документ для пользователя. Предполагается, что _id - единственный уникальный ключ в коллекции.

Теперь у нас есть полностью работоспособное решение. Одна неприятность - это обработка ошибок. Предположим, что одна из задач в последовательности не работает или вызывает исключения, блокировку необходимо снять. Поймать все исключения и снять блокировку - это не изящно. Вот питонический способ сделать это с with.

class LockUnavailableException(Exception):
    pass
class LockContext:
    def __init__(self, lock: Lock, resource_id: str):
        self.lock = lock
        self.resource_id = resource_id
    def __enter__(self):
        if self.lock.take(self.resource_id):
            print(f'Lock taken - {self.resource_id}')
        else:
            raise LockUnavailableException(f'For resource - {self.resource_id}')
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.lock.unlock(self.resource_id)
        print(f'Unlocked - {self.resource_id}')
...
def publish(user_id, stream: StreamIO):
    lock = Lock(mongo_client)
    with LockContext(lock, str(user_id)):
        #do the tasks & don't worry!

Благодарим Прамода Кумара за мозговой штурм и внедрение этого решения на работе.

В большинстве случаев разработка программного обеспечения - это кропотливая работа с большим количеством поисков в Google и отладки. Дизайн и архитектура программного обеспечения в основном возникают в проектах с нуля, что случается нечасто. Скорее всего, повседневная задача - просмотреть чужой код, выяснить, почему он что-то делает или не делает, и заставить его делать что-то еще. Однако однажды в синей луне вы столкнетесь с подобными проблемами, которые заставят вас изучить / заново изучить / реализовать интересные концепции.