Служебная шина Azure - AutoRenewTimeout для сообщений для клиента Python

Я просматривал документацию по функции AutoRenewTimeout службы и наткнулся на этот пост на Продление блокировки Политика в отношении сообщений служебной шины.

В нем говорится о функции AutoRenewTimeout, когда сообщение блокируется на определенный период времени, так что подписчик может завершить обработку сообщения, или сообщение получает тайм-аут (неспособность обработать сообщение в течение заданного периода времени. ), под которым сообщение будет видно другим подписчикам, читающим из той же подписки.

Мне не удалось найти эту функцию в Microsoft Azure SDK для Python. Я просмотрел исходный код, и он говорит только о Ручное возобновление блокировки конкретного сообщения.

Мой вариант использования выглядит следующим образом

  • Мне нужно читать и обрабатывать сообщения из служебной шины.
  • Забудьте об этих обработанных сообщениях в базе данных (MongoDB в моем случае)
  • Сообщение, отправляемое в служебную шину, может достигать 1 миллиона событий в час (поэтому невозможно отследить, когда для данного сообщения истечет время ожидания и запустить обновление вручную для этого же).
  • Все обработанные сообщения помещаются во временный список.
  • Всякий раз, когда приведенный выше список превышает определенный порог, выполните массовую вставку в БД.

Вот что я придумал. Это без политики обновления блокировки, о которой я говорил. Я просто запускаю удаление, когда сообщение обрабатывается.

class Event:
    def __read_subscription_message(self):
        try:
            message = self.bus_service.receive_subscription_message(
                self.topic_name, self.subscription_name, peek_lock=True)
            return message
        except Exception as e:
            self.logger.exception("Exception occurred!!!")

    def start_listner(self, task_number=0):
        self.logger.info('Task: %s, started listening to service bus messages' % task_number)
        while True:
            msg = self.__read_subscription_message()
            if msg and msg.body is not None:
                self.currentBackOff = 0
                self.process_event(msg, task_number)
                gevent.sleep(0)

    def process_event(self, msg, task_number=0):
        try:
            if msg.body:
                # message = json.loads(msg.body.decode())
                message = self.deserialize_message_body(msg.body)
                custom_properties = msg.custom_properties
                # Business logic implemented................
                # After processing a message append this to a temp list. Make
                # an insert which length of this list reaches a given threshold
                # by calling "write_to_storage(self, task_number=0)"
                self.bulk_records.append(record)
                msg.delete()
            else:
                self.logger.info("Message received: %s, is of type: %s" % (msg.body, type(msg.body)))
            self.total += 1
        except DeSerializationException as e:
            self.logger.info("Not able to de-serialize message: %s" % msg.body)
            self.logger.exception(e)
        except Exception as e:
            self.logger.exception(e)

    def write_to_storage(self, task_number=0):
        # Write to DB

Он работает нормально, но в случае, когда мой процесс подписчика будет убит, все те сообщения, которые находятся в моем временном ведре (которые не записаны в базу данных), были потеряны. Я хочу запустить руководство "message.delete()" для сообщений, когда они записываются в базу данных. Я думаю, что AutoRenewLock - это правильный путь, поскольку максимальное значение длительности блокировки сообщения составляет 5 минут, что не поможет в моем случае.

Спасибо


person Anurag Sharma    schedule 10.01.2018    source источник
comment
Этот временный список, который вы упомянули выше, сохраняется в ОЗУ или в каком-то постоянном хранилище?   -  person Gaurav Mantri    schedule 10.01.2018
comment
Временный список хранится в ОЗУ (это простая структура данных списка в Python). Я думаю, что запись каждой записи во временный файл (перед записью ее в db оптом) более обременительна, потому что я должен обрабатывать события, как только они приходят, чтобы избежать обратного заполнения подписки, также если процессы будут остановлены в в середине, то у меня остается поврежденный файл (потому что файл не был должным образом закрыт) или может произойти что-то подобное.   -  person Anurag Sharma    schedule 10.01.2018
comment
docs.microsoft. com / en-us / python / api / azure-servicebus / Вот документация AutolockRenewer для v7 служебной шины   -  person rakshith91    schedule 11.01.2021


Ответы (1)


Таким образом, вам не нужно записывать сообщение во временный журнал. Если вы используете блокировку просмотра и срок блокировки истекает, сообщение должно вернуться в очередь и быть получено в следующий раз. Если вы используете блокировку просмотра, вам нужно вызвать message.delete (), который вы делаете в своем коде, только после этого его следует удалить из брокера. Этот пример .net показывает, как он должен работать: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues.

Для удобства: // Заполните сообщение, чтобы оно больше не приходило. // Это можно сделать, только если queueClient создан в режиме ReceiveMode.PeekLock (по умолчанию).

ожидание queueClient.CompleteAsync (message.SystemProperties.LockToken);

Также см. Здесь, как должна работать блокировка просмотра: https://docs.microsoft.com/en-us/rest/api/servicebus/peek-lock-message-non-destructive-read

Поэтому, если вы не вызовете msg.delete, сообщение не будет считаться завершенным. У вас несколько подписчиков на одну подписку или тему?

Независимо от того, пока вы не вызываете удаление, сообщения должны просто вернуться в очередь, и ваш следующий входящий вызов заберет их. Если вы не получите ответа несколько раз, они могут стать мертвыми. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues

Используйте обозреватель служебной шины, чтобы узнать, действительно ли вы теряете сообщения, или они просто возвращаются в очередь, или получают мертвую букву: https://github.com/paolosalvatori/ServiceBusExplorer/releases

person Christian Wolf    schedule 16.01.2018