Обработка потоковых данных с использованием производителей-потребителей с 1 синхронным производителем

У меня есть приложение со следующим рабочим процессом.

  1. Пользователи загружают файл json на основе строк (каждая строка является записью) с помощью minio.
  2. Затем они отправляют запрос приложения для его обработки.

Затем приложение начинает скачивать данные в виде потока, используя этот метод со следующей сигнатурой

Task GetObjectAsync(string bucketName, string objectName, Action<Stream> callback)

Метод обратного вызова, который я использую, делает это:

void ProcessLine(Stream s)
{
    using(var streamReader = new StreamReader(s))
    {
        while(!streamReader.EndOfStream)
        {
            var line = streamReader.ReadLine(); // notice that I can't use ReadLineAsync
            var obj = DeserializeLine(line);
            // some other operations
            database.Store(obj) 
            // there is an alternative StoreAsync() which I can' use
        }
    }
}

Это работает хорошо, пока мне не нужно использовать асинхронные версии методов, а файлы относительно малы.

К сожалению, мне нужно быть готовым к варианту использования, когда есть только один, но очень большой файл (20 ГБ или что-то еще, что не помещается в память, представьте себе огромный набор данных).

Для этого я решил использовать очередь производителей-потребителей, которая будет заполняться из действия обратного вызова, а затем обрабатываться некоторыми работниками.

Я использовал ConcurentQueue как структуру данных и следующий обратный вызов

void PopulateQueue(Stream s)
{
    using(var streamReader = new StreamReader(s))
    {
        while(!streamReader.EndOfStream)
        {
            var line = streamReader.ReadLine();
            var obj = DeserializeLine(line);
            _queue.Enqueue(obj); // _queue is a private field of a type ConcurentQueue<MyObject> 
        }
    }
}

А воркеры обрабатывают так же, как и в исходной версии, но с асинхронными методами.

Проблема здесь заключается в том, что производитель намного быстрее заполняет очередь, которую обрабатывают потребители (удаляя данные из очереди). Очередь начинает расти и, как предполагалось, съест всю память, так как файл был очень большим.

Очевидным решением этой проблемы является ограничение количества записей в очереди. Но я не знаю, как это сделать в синхронном обратном вызове. В асинхронном я бы использовал await Task.Delay(100) всякий раз, когда в очереди слишком много записей.

Согласно этой статье , я должен избегать использования Task.Wait() из-за его негативного влияния на производительность или возможности взаимоблокировки.

Я читал несколько статей Стивена Клири о лучших практиках асинхронности в .Net. К сожалению, я понял из них, что в этом случае нет правильного способа вызвать асинхронные методы из обратного вызова синхронизации, и у меня плохое предчувствие по поводу использования Thread.Sleep() или ожидания.

Есть ли у вас какие-либо советы, как использовать шаблон производителя-потребителя, не нарушая принципов асинхронности, или решить эту проблему другим способом?

Спасибо.

ПРИМЕЧАНИЕ. Я думал о разделении файла на куски фиксированного размера при их загрузке, но у этого есть свои подводные камни.


person JaK    schedule 02.01.2020    source источник
comment
docs.microsoft.com/en-us/ dotnet/стандарт/коллекции/   -  person Hans Passant    schedule 03.01.2020
comment
Предполагая, что я использую коллекцию блокировки, упомянутую @HansPassant. Когда несколько файлов обрабатываются одновременно, каждый производитель (возможно) блокирует один поток при заполнении (или ожидании) очереди. Может ли это быстро привести к голоданию пула потоков? Спасибо   -  person JaK    schedule 03.01.2020
comment
Их не так много, когда у вас один производитель. Какой правильный номер при чтении из файла. Блокировка желательна и неизбежна, когда потребители не могут идти в ногу. Очень мало оснований предполагать, что любой из этих потоков должен быть потоком пула потоков, все они слишком много работают над заданием. Используйте нить.   -  person Hans Passant    schedule 03.01.2020
comment
Под множеством производителей я имел в виду, что это приложение может быть веб-приложением, где каждый запрос обрабатывается отдельно. Поэтому, используя эту концепцию, каждый производитель (я предполагаю) в конечном итоге будет работать в другом потоке. Будет ли это проблемой?   -  person JaK    schedule 03.01.2020


Ответы (1)


Как отметил @Hans Passant в комментариях, существует BlockingCollection, который решает проблему.

У него есть лимит предметов внутри, и когда лимит достигнут, он блокирует производителя до тех пор, пока количество предметов не уменьшится.

person JaK    schedule 03.01.2020