Итерации в Scala, которые используют ленивую оценку или слияние?

Я слышал, что итераторы ленивы, но насколько они точно ленивы? В качестве альтернативы, можно ли объединить итерации с функцией постобработки, чтобы не создавать промежуточную структуру данных?

Могу ли я в своей итерации, например, создать 1 миллион элементов Stream[Option[String]] из java.io.BufferedReader, а затем последовательно отфильтровать None с композиционным способом, не требуя хранения всего потока в памяти? И при этом гарантировать, что я не взорву стек? Или что-то в этом роде - не обязательно использовать Stream.

В настоящее время я использую Scalaz 6, но если другие повторные реализации смогут сделать это лучше, мне было бы интересно узнать.

Предоставьте полное решение, включая закрытие BufferedReader и вызов unsafePerformIO, если применимо.


person Robin Green    schedule 14.11.2012    source источник


Ответы (2)


Вот быстрый повторный пример с использованием библиотеки Scalaz 7, демонстрирующий интересующие вас свойства: постоянная память и использование стека.

Эта проблема

Сначала предположим, что у нас есть большой текстовый файл со строкой десятичных цифр в каждой строке, и мы хотим найти все строки, содержащие не менее двадцати нулей. Мы можем сгенерировать некоторые примеры данных, например:

val w = new java.io.PrintWriter("numbers.txt")
val r = new scala.util.Random(0)

(1 to 1000000).foreach(_ =>
  w.println((1 to 100).map(_ => r.nextInt(10)).mkString)
)

w.close()

Теперь у нас есть файл с именем numbers.txt. Давайте откроем его с помощью BufferedReader:

val reader = new java.io.BufferedReader(new java.io.FileReader("numbers.txt"))

Он не слишком велик (~ 97 мегабайт), но достаточно велик, чтобы мы могли легко увидеть, действительно ли использование нашей памяти остается постоянным во время ее обработки.

Настройка нашего счетчика

Сначала для некоторого импорта:

import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I }

И перечислитель (обратите внимание, что я меняю IoExceptionOrs на Options для удобства):

val enum = I.enumReader(reader).map(_.toOption)

В настоящее время Scalaz 7 не предоставляет удобного способа перечисления строк файла, поэтому мы разбиваем файл по одному символу за раз. Это, конечно, будет мучительно медленно, но я не буду беспокоиться об этом здесь, так как цель этой демонстрации — показать, что мы можем обрабатывать этот большой файл в постоянной памяти и без разрыва стека. В последнем разделе этого ответа дается подход с более высокой производительностью, но здесь мы просто разделимся на разрывы строк:

val split = I.splitOn[Option[Char], List, IO](_.cata(_ != '\n', false))

И если вас смущает тот факт, что splitOn принимает предикат, указывающий, где не разбиваться, вы не одиноки. split — наш первый пример перечислителя. Мы продолжим и завернем в него наш перечислитель:

val lines = split.run(enum).map(_.sequence.map(_.mkString))

Теперь у нас есть перечислитель Option[String]s в монаде IO.

Фильтрация файла с перечислением

Далее для нашего предиката — помните, мы сказали, что нам нужны строки, содержащие не менее двадцати нулей:

val pred = (_: String).count(_ == '0') >= 20

Мы можем превратить это в фильтрующий перечислитель и обернуть наш перечислитель в него:

val filtered = I.filter[Option[String], IO](_.cata(pred, true)).run(lines)

Мы настроим простое действие, которое просто печатает все, что проходит через этот фильтр:

val printAction = (I.putStrTo[Option[String]](System.out) &= filtered).run

Конечно, мы еще ничего не читали. Для этого мы используем unsafePerformIO:

printAction.unsafePerformIO()

Теперь мы можем наблюдать, как Some("0946943140969200621607610...")s медленно прокручиваются, в то время как использование памяти остается постоянным. Это медленно, а обработка ошибок и вывод немного неуклюжи, но не так уж плохо, я думаю, примерно для девяти строк кода.

Получение вывода из итерации

Это было foreach-использование. Мы также можем создать итерацию, которая больше похожа на складку — например, собирает элементы, прошедшие через фильтр, и возвращает их в виде списка. Просто повторите все вышеперечисленное до определения printAction, а затем вместо этого напишите это:

val gatherAction = (I.consume[Option[String], IO, List] &= filtered).run

Выключите это действие:

val xs: Option[List[String]] = gatherAction.unsafePerformIO().sequence

А теперь иди выпей кофе (возможно, он должен быть довольно далеко). Когда вы вернетесь, у вас будет либо None (в случае IOException где-то по пути), либо Some, содержащий список из 1943 строк.

Полный (более быстрый) пример, который автоматически закрывает файл

Чтобы ответить на ваш вопрос о закрытии средства чтения, вот полный рабочий пример, который примерно эквивалентен второй программе выше, но с перечислителем, который берет на себя ответственность за открытие и закрытие средства чтения. Это также намного, намного быстрее, так как читает строки, а не символы. Сначала для импорта и пары вспомогательных методов:

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, Either[Throwable, B]](
  action.catchLeft.map(
    r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
  )
)

def enumBuffered(r: => BufferedReader) =
  new EnumeratorT[Either[Throwable, String], IO] {
    lazy val reader = r
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(reader.readLine())).flatMap {
          case Right(null) => s.pointI
          case Right(line) => k(I.elInput(Right(line))) >>== apply[A]
          case e => k(I.elInput(e))
        }
    )
  }

А теперь перечислитель:

def enumFile(f: File): EnumeratorT[Either[Throwable, String], IO] =
  new EnumeratorT[Either[Throwable, String], IO] {
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(new BufferedReader(new FileReader(f)))).flatMap {
          case Right(reader) => I.iterateeT(
            enumBuffered(reader).apply(s).value.ensuring(IO(reader.close()))
          )
          case Left(e) => k(I.elInput(Left(e)))
        }
      )
  }

И мы готовы к работе:

val action = (
  I.consume[Either[Throwable, String], IO, List] %=
  I.filter(_.fold(_ => true, _.count(_ == '0') >= 20)) &=
  enumFile(new File("numbers.txt"))
).run

Теперь читатель будет закрыт, когда обработка будет завершена.

person Travis Brown    schedule 15.11.2012
comment
Закрывается ли BufferedReader приведенным выше кодом? Я не вижу, что бы закрыть его. - person Robin Green; 15.11.2012
comment
Вы правы, вы должны закрыть его здесь сами, но это только потому, что текущие утилиты для работы с итерациями в IO в Scalaz 7 настолько рудиментарны — не так уж сложно написать энумератор, отвечающий как за открытие, так и за закрытие файла . Когда у меня будет несколько минут, я опубликую пример. - person Travis Brown; 15.11.2012

Я должен был прочитать немного дальше... это именно то, для чего нужны enumeratees. Перечисления определены в Scalaz 7 и Play 2, но не в Scalaz 6.

Перечисленные предназначены для «вертикальной» композиции (в смысле «вертикально интегрированной отрасли»), в то время как обычные итерации составляют монадически «горизонтальным» способом.

person Robin Green    schedule 14.11.2012