Mutex, сокращение от взаимного исключения, - это инструмент для управления параллелизмом в программном обеспечении. Бывают случаи, когда вам нужно выполнить последовательность задач в программном приложении, и после того, как одна из них запустила последовательность, вы не хотите, чтобы другая запускалась, пока первая не будет завершена.
Возьмем, к примеру, задачу публикации мультимедиа, когда пользователь загружает мультимедийный контент (изображение / аудио / видео). Сервер должен обработать этот носитель перед фактической публикацией, возможно, для уменьшения размера и выполнения проверок соответствия. Пользователю не должно быть разрешено публиковать другой артефакт, пока первый находится в обработке. Последовательность задач может быть такой:
- Потоковая передача мультимедиа и запись в хранилище. Назовем это артефактом.
- Запустите службу изменения размера артефакта.
- Отправьте артефакт на комплаенс-задания, дождитесь их ответа.
- Опубликуйте артефакт.
В зависимости от программной среды может быть несколько простых решений. Например, если сервер представляет собой монолит с одним узлом и одним процессом, беспокоиться не о чем. Запросы будут поставлены в очередь и в любом случае будут обрабатываться по одному. Если среда бессерверная и основана на микросервисах, все будет немного интереснее. Еще один нюанс - блокировка распространяется только на конкретного пользователя. Служба, вероятно, должна разрешить другим пользователям публиковать, пока идет обработка для несвязанных пользователей.
Первый инстинкт - создать замок. Как только запрос на публикацию получен, пользователь заблокирован от публикации чего-либо. После завершения задачи блокировка будет снята.
def publish(user_id: int, stream: StreamIO): if not is_locked(user_id): lock(user_id) #do the tasks unlock(user_id)
Суть решения такова. Однако есть проблема в том, что два запроса «одновременно» (могут быть на двух разных узлах / экземплярах) вошли в блок if, и теперь оба они думают, что заблокировали пользователя. Идея состоит в том, чтобы использовать атомарную операцию для is_locked () и lock (). Что-то вроде этого:
def publish(user_id: int, stream: StreamIO): lck = check_and_lock(userId) if not lock: return 'Already locked!' #do the tasks unlock(userId)
MongoDB обеспечивает атомарность на уровне документа. Если для каждого пользователя существует один документ, функция блокировки может быть реализована следующим образом:
class Lock: def __init__(mongo_client): self.collection = mongo_client[_db_name][_collection_name] def _toggle(self, resource_id: str, value: bool) -> bool: res = self.collection.update_one( {'_id': resource_id, 'locked': not value}, {'$set': {'locked': value}}, ) if res.modified_count == 0: return False if res.modified_count == 1: return True raise Exception('Multiple documents found!') def lock(self, resource_id: str) -> bool: return self._toggle(resource_id, True) def unlock(self, resource_id: str) -> bool: return self._toggle(resource_id, False) ... def publish(user_id: int, stream: StreamIO): lock = Lock(mongo_client) is_locked = Lock.lock(str(user_id)) if not is_locked: return 'Already locked!' #do the tasks lock.release(str(user_id)) return 'Success!'
Мы почти на месте. Бывают случаи, когда пользователи впервые публикуют документы, у которых нет документа в коллекции MongoDB. Чтобы избежать сбоев, MongoDB имеет функцию upsert, которая придет на помощь:
class Lock: def __init__(mongo_client): self.collection = mongo_client[_db_name][_collection_name] def lock(self, resource_id: str) -> bool: try: res = self.collection.update_one( {'_id': resource_id, 'locked': False}, {'$set': {'locked': True}}, upsert=True ) assert res.modified_count == 1 or res.raw_result['n'] == 1 return True except pymongo.errors.DuplicateKeyError: print(f'Lock exists on resource_id: {resource_id}') return False def unlock(self, resource_id: str) -> bool: res = self.collection.update_one( {'_id': resource_id, 'locked': True}, {'$set': {'locked': False}} ) return res.modified_count == 1
Здесь на картинке появляется try-except, потому что upsert будет проверять документ с запросом
{‘_id’: user_id, 'locked': False}
Этот запрос может вернуть НУЛЬ результатов при двух условиях:
- Нет документов для _id = user_id
- Нет документа для _id = userId И заблокировано = false
Первый - это наш новый угловой случай пользователя, где нам нужно просто создать новый документ. Во втором случае user_id уже заблокирован. В последнем случае запрос update_one + upsert попытается создать документ, но вызовет исключение дублирования ключа, поскольку он не может создать второй документ для пользователя. Предполагается, что _id - единственный уникальный ключ в коллекции.
Теперь у нас есть полностью работоспособное решение. Одна неприятность - это обработка ошибок. Предположим, что одна из задач в последовательности не работает или вызывает исключения, блокировку необходимо снять. Поймать все исключения и снять блокировку - это не изящно. Вот питонический способ сделать это с with.
class LockUnavailableException(Exception): pass class LockContext: def __init__(self, lock: Lock, resource_id: str): self.lock = lock self.resource_id = resource_id def __enter__(self): if self.lock.take(self.resource_id): print(f'Lock taken - {self.resource_id}') else: raise LockUnavailableException(f'For resource - {self.resource_id}') def __exit__(self, exc_type, exc_val, exc_tb): self.lock.unlock(self.resource_id) print(f'Unlocked - {self.resource_id}') ... def publish(user_id, stream: StreamIO): lock = Lock(mongo_client) with LockContext(lock, str(user_id)): #do the tasks & don't worry!
Благодарим Прамода Кумара за мозговой штурм и внедрение этого решения на работе.
В большинстве случаев разработка программного обеспечения - это кропотливая работа с большим количеством поисков в Google и отладки. Дизайн и архитектура программного обеспечения в основном возникают в проектах с нуля, что случается нечасто. Скорее всего, повседневная задача - просмотреть чужой код, выяснить, почему он что-то делает или не делает, и заставить его делать что-то еще. Однако однажды в синей луне вы столкнетесь с подобными проблемами, которые заставят вас изучить / заново изучить / реализовать интересные концепции.