У меня есть приложение со следующим рабочим процессом.
- Пользователи загружают файл json на основе строк (каждая строка является записью) с помощью minio.
- Затем они отправляют запрос приложения для его обработки.
Затем приложение начинает скачивать данные в виде потока, используя этот метод со следующей сигнатурой
Task GetObjectAsync(string bucketName, string objectName, Action<Stream> callback)
Метод обратного вызова, который я использую, делает это:
void ProcessLine(Stream s)
{
using(var streamReader = new StreamReader(s))
{
while(!streamReader.EndOfStream)
{
var line = streamReader.ReadLine(); // notice that I can't use ReadLineAsync
var obj = DeserializeLine(line);
// some other operations
database.Store(obj)
// there is an alternative StoreAsync() which I can' use
}
}
}
Это работает хорошо, пока мне не нужно использовать асинхронные версии методов, а файлы относительно малы.
К сожалению, мне нужно быть готовым к варианту использования, когда есть только один, но очень большой файл (20 ГБ или что-то еще, что не помещается в память, представьте себе огромный набор данных).
Для этого я решил использовать очередь производителей-потребителей, которая будет заполняться из действия обратного вызова, а затем обрабатываться некоторыми работниками.
Я использовал ConcurentQueue как структуру данных и следующий обратный вызов
void PopulateQueue(Stream s)
{
using(var streamReader = new StreamReader(s))
{
while(!streamReader.EndOfStream)
{
var line = streamReader.ReadLine();
var obj = DeserializeLine(line);
_queue.Enqueue(obj); // _queue is a private field of a type ConcurentQueue<MyObject>
}
}
}
А воркеры обрабатывают так же, как и в исходной версии, но с асинхронными методами.
Проблема здесь заключается в том, что производитель намного быстрее заполняет очередь, которую обрабатывают потребители (удаляя данные из очереди). Очередь начинает расти и, как предполагалось, съест всю память, так как файл был очень большим.
Очевидным решением этой проблемы является ограничение количества записей в очереди. Но я не знаю, как это сделать в синхронном обратном вызове. В асинхронном я бы использовал await Task.Delay(100)
всякий раз, когда в очереди слишком много записей.
Согласно этой статье , я должен избегать использования Task.Wait()
из-за его негативного влияния на производительность или возможности взаимоблокировки.
Я читал несколько статей Стивена Клири о лучших практиках асинхронности в .Net. К сожалению, я понял из них, что в этом случае нет правильного способа вызвать асинхронные методы из обратного вызова синхронизации, и у меня плохое предчувствие по поводу использования Thread.Sleep()
или ожидания.
Есть ли у вас какие-либо советы, как использовать шаблон производителя-потребителя, не нарушая принципов асинхронности, или решить эту проблему другим способом?
Спасибо.
ПРИМЕЧАНИЕ. Я думал о разделении файла на куски фиксированного размера при их загрузке, но у этого есть свои подводные камни.