Сервисная шина Azure - сеанс дублирует сообщения при доставке в несколько экземпляров.

Я работаю над системой, в которой мне нужно управлять упорядоченными сообщениями, через служебную шину. В моей компании все работает в облаке Azure, и я использую служебную шину Azure для обмена сообщениями. Я читал о сеансе, и мне кажется, что он может решить мою проблему из коробки с помощью служебной шины Azure, но она работает только с одним экземпляром. Как только я масштабирую задание we до нескольких экземпляров, несколько экземпляров используют одни и те же сообщения из одного сеанса. В моем POC я использую тему с подпиской.

Может ли кто-нибудь заставить его работать с масштабированием до нескольких экземпляров приемника? Второй вариант, который у меня есть, - это реализовать самостоятельно корпоративный шаблон Resequencer. Есть ли у вас какие-либо предложения?

Вот код, который я использую для получения сообщений:

static async Task InitializeReceiver(string connectionString, string queueName, CancellationToken ct)
    {
        var receiverFactory = MessagingFactory.CreateFromConnectionString(connectionString);

        ct.Register(() => receiverFactory.Close());

        var client = receiverFactory.CreateSubscriptionClient(queueName, "parallel", ReceiveMode.ReceiveAndDelete);

        client.RegisterSessionHandler(
            typeof(SessionHandler),
            new SessionHandlerOptions
            {
                AutoRenewTimeout = TimeSpan.FromMinutes(5),
                MessageWaitTimeout = TimeSpan.FromSeconds(120),
                MaxConcurrentSessions = 100,
                AutoComplete = false
            });
    }

Вот код, который я использую для обработки:

public async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
    {
      await ProcessMessage(session, message, currentInstanceId, recipeStep);
      // If I process the last message of the session
      if (message.Sequence == numberOfMsgPerSession)
        {
            // end of the session!
            await session.CloseAsync();
            logger.LogInformation($"Session with id {message.SessionId} is completed.");

        }

}

Спасибо. С Уважением.


person OaicStef    schedule 24.07.2018    source источник
comment
Используют ли они одну и ту же подписку или подписку на каждый экземпляр?   -  person Mikhail Shilkov    schedule 24.07.2018
comment
В моем POC они используют одну и ту же подписку, если нет, я полагаю, что было бы нормально получать одни и те же сообщения,   -  person OaicStef    schedule 24.07.2018


Ответы (2)


Я использовал таймер для запуска своих методов в веб-заданиях. Что касается масштабирования, количество таймеров увеличивалось пропорционально количеству экземпляров. Там я заменил таймеры подписками на темы служебной шины Azure. Поскольку сообщения могут быть получены только один раз, методы срабатывают только один раз, даже если запущено более одного экземпляра.

У меня был активный получатель, прослушивающий сообщения, проверьте здесь для создания активного слушателя.

Поскольку вы говорили, что сообщения получены в нескольких случаях, вы, возможно, просматривали сообщения, вместо того, чтобы получать их. Просматривая сообщение, не удаляйте его из Подписки. Только его получение удалит его из Подписки, в результате чего сообщение станет недоступным для других получателей.

person Arunprabhu    schedule 24.07.2018
comment
Привет, у меня проблема возникает с использованием сеанса, вы тоже ими пользовались? - person OaicStef; 24.07.2018
comment
Нет, я использовал тему с несколькими подписками с правилами. - person Arunprabhu; 24.07.2018
comment
Без сеансов я не смогу решить упорядочивание обработки сообщений. - person OaicStef; 24.07.2018
comment
Даже при использовании сеансов сообщения можно получить только один раз. Вы уверены, что использовали режим ReceiveAndDelete для клиента и AcceptMessageSession () до получения. - person Arunprabhu; 24.07.2018
comment
На самом деле я использовал PeekAndLock и этот другой метод для активного получателя RegisterSessionHandler. Я попробую изменить код с помощью ReceiveAndDelete и AcceptMessageSession. Надеюсь решить проблему. Спасибо. Я дам Вам знать. - person OaicStef; 24.07.2018
comment
Если вы измените блокировку просмотра на получение и удаление, это наверняка решит вашу проблему :-) - person Arunprabhu; 24.07.2018
comment
Привет, снова, это не решило проблему, внезапно, когда я провожу тест с несколькими сообщениями, все идет нормально, но когда я отправляю 200 сообщений за сеанс, с 10 сеансами, обработка дублируется. Думаю, что сам реализую шаблон служебной шины Resequencer Enterprise. - person OaicStef; 24.07.2018
comment
Сообщения можно получить только один раз. Я не уверен, почему происходит дублирование обработки сообщений. Есть ли вероятность того, что в Подписку будут отправляться повторяющиеся сообщения? - person Arunprabhu; 24.07.2018
comment
Нет, этого не происходит, потому что я отслеживаю все отправленные события, обработанные в базе данных sql server, поэтому я вижу дублирование при обработке. Это действительно странно, и я вижу, что это происходит, когда я масштабируюсь в Azure, но на локальном уровне все работает нормально. - person OaicStef; 24.07.2018
comment
Можете ли вы поделиться логикой обработки сообщений, которую вы использовали в конце - person Arunprabhu; 24.07.2018
comment
Привет, я разместил код, который использую, на стороне получателя, как вы предложили. Я хочу повторить, что он работает локально с несколькими экземплярами, но не работает в Azure, возможно, это связано с какой-то ошибкой на стороне Azure? - person OaicStef; 25.07.2018

Я решил это. Если вы видите код, который я разместил в вопросе, для каждого сеанса я закрываю его, когда обрабатываю последнее сообщение, на основе его порядкового номера. Как только я удалил этот фрагмент кода, все начало работать правильно, я отправляю 200 сообщений за сеанс с 10 сеансами. Сообщения обрабатываются в порядке для каждого сеанса, и не более одного экземпляра занимает сеанс, как я ожидал.

Спасибо вам всем. OaicStef

person OaicStef    schedule 25.07.2018