Я хотел прочитать строки из файла CSV и использовать RX.Net для некоторых преобразований, и я хотел выполнить пакетное обновление и отправлять обновление каждые 250 миллисекунд.
public static IEnumerable<string> ReadCSV(string filePath)
{
var reader = new StreamReader(File.OpenRead(filePath));
while (!reader.EndOfStream)
{
var line = reader.ReadLine();
yield return line;
}
}
var rows = ReadCSV("filePath").ToObservable();
rows
.Buffer(50)
.Zip(Observable.Interval(
TimeSpan.FromMilliseconds(250)), (res, _) => res)
.Subscribe(lines =>
{
//do something
});
Я использую CSV-файл размером около 80 МБ, но консольный проект достигает 1 ГБ.
Здесь происходит то, что Zip ожидает обе последовательности, чтобы дать ему сигнал. Последовательность Csv отдает данные очень быстро, поэтому пакетные обновления сохраняются в памяти и ожидают другой последовательности.
Что еще хуже, так это то, что память не освобождается, даже если все обновления обрабатываются. Если я удаляю Zip, память выглядит очень хорошо, похоже, что она освобождает память при обработке пакета (все приложение занимает всего около 20 МБ за все время).
Два вопроса
Есть ли способ сообщить наблюдаемому, что я хочу приостановить чтение до тех пор, пока не будет обработано предыдущее (в моем случае это буферизованные строки).
Почему память не освобождается после обработки всех обновлений, есть ли способ избежать этого?