SubscriptionClient.RecieveBatch не получает все сообщения через посредника

У меня есть консольное приложение для чтения всех сообщений через посредника, присутствующих в подписке в служебной шине Azure. У меня там около 3500 сообщений. Это мой код для чтения сообщений:

SubscriptionClient client = messagingFactory.CreateSubscriptionClient(topic, subscription);   
long count = namespaceManager.GetSubscription(topic, subscription).MessageCountDetails.ActiveMessageCount;
Console.WriteLine("Total messages to process : {0}", count.ToString()); //Here the number is showing correctly
IEnumerable<BrokeredMessage> dlIE = null;
dlIE = client.ReceiveBatch(Convert.ToInt32(count));

Когда я выполняю код в dlIE, я вижу только 256 сообщений. Я также попытался указать счетчик предварительной выборки, как этот client.PrefetchCount, но тогда он также возвращает только 256 сообщений.

Я думаю, что есть некоторое ограничение на количество сообщений, которые можно получить за один раз, однако на странице msdn для метода RecieveBatch ничего подобного не упоминается. Что я могу сделать, чтобы получать все сообщения за раз?

Примечание.

  1. Я хочу только прочитать сообщение, а затем оставить его на служебной шине. Поэтому я не использую message.complete метод.

  2. Я не могу удалить и заново создать тему / подписку из служебной шины.

Изменить:

Я использовал PeekBatch вместо ReceiveBatch вот так:

    IEnumerable<BrokeredMessage> dlIE = null;
                            List<BrokeredMessage> bmList = new List<BrokeredMessage>();
  long i = 0;
   dlIE = subsciptionClient.PeekBatch(Convert.ToInt32(count)); // count is the total number of messages in the subscription.
  bmList.AddRange(dlIE);
  i = dlIE.Count();
 if(i < count)
  {           
 while(i < count)
  {
  IEnumerable<BrokeredMessage> dlTemp = null;
   dlTemp = subsciptionClient.PeekBatch(i, Convert.ToInt32(count));
    bmList.AddRange(dlTemp);
    i = i + dlTemp.Count();
    }
    }

У меня в подписке 3255 сообщений. При первом вызове peekBatch он получает 250 сообщений. поэтому он переходит в цикл while с PeekBatch(250,3225). Каждый раз приходит только 250 сообщений. Окончательное общее количество сообщений, которые у меня есть в списке вывода, составляет 3500 с дубликатами. Я не могу понять, как это происходит.


person nitinvertigo    schedule 19.10.2015    source источник


Ответы (3)


Я понял это. Клиент подписки запоминает последний полученный пакет и при повторном вызове получает следующий пакет.

Итак, код будет:

    IEnumerable<BrokeredMessage> dlIE = null;
List<BrokeredMessage> bmList = new List<BrokeredMessage>();
  long i = 0;  
  while (i < count)
  {
   dlIE = subsciptionClient.PeekBatch(Convert.ToInt32(count));
   bmList.AddRange(dlIE);
   i = i + dlIE.Count();
  }

Спасибо MikeWo за руководство

Примечание. Похоже, существует какое-то ограничение на количество сообщений, которые вы можете просматривать за раз. Я пробовал использовать разные подписки, и количество полученных сообщений было разным для каждой.

person nitinvertigo    schedule 19.10.2015
comment
Я упомянул об этом в своем ответе. При использовании одного и того же SubscriptionClient с PeekBatch под капотом последний извлеченный порядковый номер сохраняется, поскольку при циклическом прохождении он должен отслеживать и проходить через всю очередь. - person MikeWo; 19.10.2015
comment
@MikeWo извините, я не прочитал ваш ответ здесь, но для другого вопроса в SO. На самом деле, прежде чем я увидел ваш ответ здесь, я работал с peekbatch на основе вашего ответа где-то еще. - person nitinvertigo; 19.10.2015
comment
@MikeWo Есть ли ограничение на количество сообщений, получаемых PeekBatch ?? Он отлично работает для 3200, но когда в подписке 5000 сообщений, отображается следующая ошибка: Внутренняя ошибка сервера: сервер не предоставил значимый ответ; это могло быть вызвано преждевременным завершением сеанса. - person nitinvertigo; 19.10.2015
comment
@MikeWo Я думаю, есть проблема с темой. Потому что даже использование peekbatch (250) дает ту же ошибку. подписка на 3200 сообщений (которая работала) была в другой теме. - person nitinvertigo; 19.10.2015
comment
Теоретически в документации, которую я смог найти, не упоминается максимум, а параметр messageCount - это Int; однако я не думаю, что когда-либо рассчитываю, что смогу так захватить сразу большие группы. Для подглядывания это, вероятно, не так уж и плохо, но это зависит от того, насколько они велики, постоянства в размере и того, что вы делаете с ними в целом. Что касается ошибки, как вы сказали, это может быть что-то еще, а не проблема с количеством сообщений. - person MikeWo; 19.10.2015
comment
@MikeWo Я думаю, дело не в количестве сообщений, а в общем размере полученного пакета. Потому что для некоторых подписок я могу получить за раз 250, а для некоторых только 100. В моем случае размеры сообщений для разных подписок различаются. - person nitinvertigo; 19.10.2015
comment
Для меня это имеет смысл. Как я уже упоминал ранее, получение такого количества сообщений одновременно, даже при подглядывании, кажется не очень хорошей идеей в зависимости от того, почему вы заставляете их начинать. - person MikeWo; 20.10.2015

Тема, в которую вы пишете, случайно разбита на разделы? Когда вы получаете сообщения от разделенного на разделы объекта, оно будет извлекаться только из одного из разделов за раз. из MSDN:

"Когда клиент хочет получить сообщение из секционированной очереди или из подписки секционированной темы, служебная шина запрашивает сообщения во всех фрагментах, а затем возвращает первое сообщение, которое возвращается из любого хранилища сообщений в получатель. Служебная шина кэширует другие сообщения и возвращает их при получении дополнительных запросов на получение. Принимающий клиент не знает о секционировании; о поведении секционированной очереди или темы при обращении к клиенту (например, чтение, завершение, отложенное, мертвое письмо , предварительная выборка) идентична поведению обычного объекта "

Вероятно, не стоит предполагать, что даже с несекционированной сущностью вы получите все сообщения за один раз с помощью методов Receive или Peek. Было бы гораздо эффективнее перебирать сообщения гораздо меньшими партиями, особенно если ваше сообщение имеет приличный размер или неопределенный размер.

Поскольку вы на самом деле не хотите удалять сообщение из очереди, я бы предложил использовать PeekBatch вместо ReceiveBatch. Это позволяет получить копию сообщения и не блокирует ее. Я настоятельно рекомендую цикл с использованием того же SubscriptionClient вместе с PeekBatch. При использовании одного и того же SubscriptionClient с PeekBatch под капотом последний извлеченный порядковый номер сохраняется, поскольку при циклическом прохождении он должен отслеживать и проходить через всю очередь. По сути, это позволит вам прочитать всю очередь.

person MikeWo    schedule 19.10.2015
comment
Привет, спасибо за ответ. Я использовал подход PeekBatch и у меня возникла проблема. Я обновил вопрос. Не могли бы вы взглянуть на это. - person nitinvertigo; 19.10.2015

Я столкнулся с аналогичной проблемой, когда client.ReceiveBatchAsync(....) не извлекал данные из подписки в сервисной шине Azure.

После некоторого покопания я обнаружил, что для каждого подписчика есть бит для включения пакетных операций. Это можно включить только через PowerShell. Ниже приведена команда, которую я использовал:

$subObject = Get-AzureRmServiceBusSubscription -ResourceGroup '#resourceName' -NamespaceName '#namespaceName' -Topic '#topicName' -SubscriptionName '#subscriptionName' 
$subObject.EnableBatchedOperations = $True

Set-AzureRmServiceBusSubscription -ResourceGroup '#resourceName' -NamespaceName '#namespaceName' -Topic '#topicName'-SubscriptionObj $subObject

Более подробную информацию можно найти на здесь. Хотя он еще не загрузил все сообщения, по крайней мере, он начал очищать очередь. Насколько мне известно, параметр размера пакета используется только как предложение для служебной шины, но не как правило.

Надеюсь, поможет!

person jonmall    schedule 05.09.2017