Я просматривал документацию по функции AutoRenewTimeout службы и наткнулся на этот пост на Продление блокировки Политика в отношении сообщений служебной шины.
В нем говорится о функции AutoRenewTimeout, когда сообщение блокируется на определенный период времени, так что подписчик может завершить обработку сообщения, или сообщение получает тайм-аут (неспособность обработать сообщение в течение заданного периода времени. ), под которым сообщение будет видно другим подписчикам, читающим из той же подписки.
Мне не удалось найти эту функцию в Microsoft Azure SDK для Python. Я просмотрел исходный код, и он говорит только о Ручное возобновление блокировки конкретного сообщения.
Мой вариант использования выглядит следующим образом
- Мне нужно читать и обрабатывать сообщения из служебной шины.
- Забудьте об этих обработанных сообщениях в базе данных (MongoDB в моем случае)
- Сообщение, отправляемое в служебную шину, может достигать 1 миллиона событий в час (поэтому невозможно отследить, когда для данного сообщения истечет время ожидания и запустить обновление вручную для этого же).
- Все обработанные сообщения помещаются во временный список.
- Всякий раз, когда приведенный выше список превышает определенный порог, выполните массовую вставку в БД.
Вот что я придумал. Это без политики обновления блокировки, о которой я говорил. Я просто запускаю удаление, когда сообщение обрабатывается.
class Event:
def __read_subscription_message(self):
try:
message = self.bus_service.receive_subscription_message(
self.topic_name, self.subscription_name, peek_lock=True)
return message
except Exception as e:
self.logger.exception("Exception occurred!!!")
def start_listner(self, task_number=0):
self.logger.info('Task: %s, started listening to service bus messages' % task_number)
while True:
msg = self.__read_subscription_message()
if msg and msg.body is not None:
self.currentBackOff = 0
self.process_event(msg, task_number)
gevent.sleep(0)
def process_event(self, msg, task_number=0):
try:
if msg.body:
# message = json.loads(msg.body.decode())
message = self.deserialize_message_body(msg.body)
custom_properties = msg.custom_properties
# Business logic implemented................
# After processing a message append this to a temp list. Make
# an insert which length of this list reaches a given threshold
# by calling "write_to_storage(self, task_number=0)"
self.bulk_records.append(record)
msg.delete()
else:
self.logger.info("Message received: %s, is of type: %s" % (msg.body, type(msg.body)))
self.total += 1
except DeSerializationException as e:
self.logger.info("Not able to de-serialize message: %s" % msg.body)
self.logger.exception(e)
except Exception as e:
self.logger.exception(e)
def write_to_storage(self, task_number=0):
# Write to DB
Он работает нормально, но в случае, когда мой процесс подписчика будет убит, все те сообщения, которые находятся в моем временном ведре (которые не записаны в базу данных), были потеряны. Я хочу запустить руководство "message.delete()
" для сообщений, когда они записываются в базу данных. Я думаю, что AutoRenewLock
- это правильный путь, поскольку максимальное значение длительности блокировки сообщения составляет 5 минут, что не поможет в моем случае.
Спасибо