Проблема обратного давления в rx.net

Я хотел прочитать строки из файла CSV и использовать RX.Net для некоторых преобразований, и я хотел выполнить пакетное обновление и отправлять обновление каждые 250 миллисекунд.

public static IEnumerable<string> ReadCSV(string filePath)
{
    var reader = new StreamReader(File.OpenRead(filePath));
    while (!reader.EndOfStream)
    {
        var line = reader.ReadLine();
        yield return line;
    }
}

var rows = ReadCSV("filePath").ToObservable();

rows
    .Buffer(50)
    .Zip(Observable.Interval(
        TimeSpan.FromMilliseconds(250)), (res, _) => res)
    .Subscribe(lines =>
        {
            //do something
        });

Я использую CSV-файл размером около 80 МБ, но консольный проект достигает 1 ГБ.

Здесь происходит то, что Zip ожидает обе последовательности, чтобы дать ему сигнал. Последовательность Csv отдает данные очень быстро, поэтому пакетные обновления сохраняются в памяти и ожидают другой последовательности.

Что еще хуже, так это то, что память не освобождается, даже если все обновления обрабатываются. Если я удаляю Zip, память выглядит очень хорошо, похоже, что она освобождает память при обработке пакета (все приложение занимает всего около 20 МБ за все время).

Два вопроса

  1. Есть ли способ сообщить наблюдаемому, что я хочу приостановить чтение до тех пор, пока не будет обработано предыдущее (в моем случае это буферизованные строки).

  2. Почему память не освобождается после обработки всех обновлений, есть ли способ избежать этого?


person Will    schedule 03.07.2019    source источник


Ответы (2)


Мне удалось найти решение для вопроса 1.

rows
    .Buffer(50)
    .Select(lines =>
    {
        Thread.Sleep(250);
        return lines;
    }
    .Subscribe(lines =>
        {
            //do something
        });

Весь процесс синхронизирован, поэтому, когда я делаю Thread.Sleep, наблюдаемые также перестают читать данные.

Хотя, возможно, это не очень хороший ответ.

person Will    schedule 03.07.2019

Мне не удалось воссоздать вашу проблему с использованием памяти. Я использовал файл размером 50 мб. Однако я предполагаю, что часть вашей проблемы заключается в том, что .ToObservable() извлекает данные из IEnumerable как можно быстрее.

Так почему бы просто не замедлить IEnumerable, скорость которого вы вытягиваете данные с диска, методом расширения?

(Оператор .Buffer() для IEnumerable, используемый в примере, доступен в Ix.Net).

Вот так:

ReadCSC()
.Buffer(50)
.SlowDown(250)
.ToObservable() etc.
...

public static IEnumerable<IList<string>> SlowDown(this IEnumerable<IList<string>> source, int milliSeconds)
{
    foreach(var item in source)
    {
        yield return item;
        Thread.Sleep(milliSeconds);
    }
}

(В C# 8 можно будет сделать этот метод асинхронным и использовать Task.Delay вместо Thread.Sleep, чтобы не блокировать поток).

Таким образом, ваши данные считываются с диска медленнее. Решит ли это вашу проблему с памятью, я не знаю.

person Magnus    schedule 13.07.2019