генератор / блок в итератор / преобразование потока

В основном я хочу преобразовать это:

def data(block: T => Unit)

в Stream (dataToStream - это гипотетическая функция, выполняющая это преобразование):

val dataStream: Stream[T] = dataToStream(data)

Полагаю, эту проблему можно решить с помощью продолжений:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

Спасибо, Давид


person Dawid Grzesiak    schedule 26.09.2010    source источник
comment
Награда за ответы без нитей или убедительный аргумент в пользу того, что их нет.   -  person Dave Griffith    schedule 28.09.2010
comment
Ваш блок не производит никакой ценности. Как это можно превратить в поток? Единица - синглтон.   -  person Randall Schulz    schedule 28.09.2010
comment
Желаемый поток - это последовательность аргументов, которые отправляются для блокировки, а не результаты этих вызовов.   -  person Dave Griffith    schedule 28.09.2010
comment
Зачем вам нужен Stream? По какой-то особой причине? Traversable или TraversableView дают вам большую гибкость. map, flatMap, filter и т. д. ленивы. Он использует исключения, чтобы предотвратить блокировку каждого вызова при вызове таких методов, как take. В целом, потребность в Stream кажется здесь несерьезной и требует либо (A) использования потоков, чтобы иметь возможность переключать стек между функцией данных и итерацией потока. или (B) буферизация всех значений и создание потока из этого буфера. Это больше зависит от того, какие инструменты у вас есть на JVM, хотя я хотел бы быть удивленным.   -  person jsuereth    schedule 29.09.2010
comment
Это был просто пример. Меня не волнует, получу ли я Stream, Iterator или Traversable. Суть в том, чтобы преобразовать генератор данных в ленивый, эффективный по памяти и ЦП поток данных.   -  person Dawid Grzesiak    schedule 30.09.2010
comment
@Randall Schulz = ›Преобразование блока, производящего значение, в Stream - простая задача. См. gist.github.com/603527.   -  person Dawid Grzesiak    schedule 30.09.2010


Ответы (4)


EDITED: изменены примеры, чтобы показать лень traversable.view

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

Метод toTraversable преобразует вашу функцию данных в коллекцию Traversable. Само по себе это ничего огромного, но вы можете преобразовать это в TraversableView, который является ленивым. Вот пример:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

Прискорбная природа метода take состоит в том, что для правильной работы он должен пройти на одно значение после последнего сгенерированного значения, но он завершится раньше. Приведенный выше код выглядел бы так же без вызова ".view". Однако вот более убедительный пример:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

Итак, в заключение, я считаю, что коллекция, которую вы ищете, - это TraversableView, которую проще всего создать в виде, сделав Traversable, а затем вызвав для него "view". Если вам действительно нужен тип Stream, вот метод, который работает в 2.8.0.final и создаст «Stream» без потоков:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

Плохая природа этого метода заключается в том, что он будет перебирать всю возможность перемещения перед созданием потока. Это также означает, что все значения необходимо буферизовать в памяти. Единственная альтернатива - прибегнуть к нитям.

В качестве отступления: это была мотивирующая причина предпочесть Traversables в качестве прямого возврата от методов scalax.io.File: "lines", "chars" и "bytes".

person jsuereth    schedule 28.09.2010
comment
Как видите, данные сначала оцениваются, а затем преобразуются в Stream. Так что лени здесь нет. - person Dawid Grzesiak; 29.09.2010
comment
Я хочу сказать, что вы можете взаимодействовать с данными как с потоком, если используете TraversableView. Требуя тип Stream, вы ограничиваете себя. TraversableView ленив. - person jsuereth; 30.09.2010
comment
Если просматриваемое представление не выглядит ленивым в REPL, это потому, что REPL вызывает toString для результирующих выражений, и это заставит TraversableView пройти всю коллекцию (отображая все значения). Если вы разрабатываете функцию с помощью TraversableView, вы увидите ее лень. - person jsuereth; 30.09.2010
comment
Хм, это действительно неплохая идея. Иногда этого решения будет достаточно (особенно если вы хотите просмотреть все данные подряд), а иногда нет. См. gist.github.com/603569 В идеале последний пример вывода также должен быть чересстрочным. Жалко, что вы не можете сделать для него Stream или Iterator или можете, но он сначала оценит все данные. Если у вас есть Stream / Iterator, вы можете использовать два или более потока данных одновременно. Например, возьмите (3) из этого возьмите (10) из другого итератора. В любом случае это отличный и полезный фрагмент кода! - person Dawid Grzesiak; 30.09.2010
comment
Используя потоки, когда вы не использовали все данные, поток не будет остановлен, а приостановлен. Так что у него тоже есть недостатки ... - person Dawid Grzesiak; 30.09.2010
comment
Вы забыли вызвать .view на traversable. Это сделает его ленивым и чередует результаты. Без вызова .view все методы коллекций будут активны и будут генерировать промежуточные коллекции. В этом случае ваш вызов выполняется немедленно. Сначала вызовите, затем возьмите. - person jsuereth; 30.09.2010
comment
@jsuereth В общих случаях я иногда задаюсь вопросом, следует ли мне вызывать метод .view или .toStream. Какой из них эффективнее? Я вижу, что результат .view строго привязан к теме, например Traversable и TraversableView. Поэтому все классы * View должны быть подготовлены заранее, что и делают создатели Scala. - person Dawid Grzesiak; 30.09.2010
comment
Я бы не стал называть toStream, если вы не уверены, что можете уместить всю коллекцию в памяти и не хотите это делать. - person jsuereth; 30.09.2010
comment
Мой вопрос был частью представленного решения. Я имею в виду, каковы плюсы и минусы вызова .view и .toStream в общих случаях. Потоки ленивы и эффективны с точки зрения памяти. - person Dawid Grzesiak; 30.09.2010
comment
Потоки ленивы и эффективны с точки зрения памяти, однако реализация toStream по умолчанию должна буферизовать все коллекции перед созданием потока. Интерфейс Iterable может сделать toStream соответствующим образом ленивым. Большинство коллекций расширяют Iterable, так что в этом случае вы правы, они ленивы. Однако потоки не так эффективно используют память, как вы думаете. Они запоминают- ›То есть они сохраняют ранее сгенерированные значения. В этом случае представление более эффективно с точки зрения памяти, поскольку оно не сохраняет сгенерированные значения. - person jsuereth; 30.09.2010

Вот простое решение, которое порождает поток, потребляющий данные. Он отправляет данные в SynchronousQueue. Создается и возвращается поток, который извлекает данные из очереди:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   
person Geoff Reedy    schedule 26.09.2010
comment
Из-за ограничений CPS это может быть единственным решением для Scala до v2.8. К сожалению, это в 170 раз медленнее, чем при использовании чистого генератора. См. gist.github.com/a79c0a9669eea3d47eee. - person Dawid Grzesiak; 27.09.2010

Мне все еще нужно придумать, как это сделать. Я подозреваю, что ответ лежит где-то здесь:

Изменить: удален код, показывающий, как решить другую проблему.

Edit2: использование кода http://gist.github.com/580157, который изначально был опубликован http://gist.github.com/574873, вы можете сделать это:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

data не принимает код блока, но я думаю, что это нормально, потому что с продолжением блок может обрабатываться вызывающим. Код для генератора можно увидеть в github.

person huynhjl    schedule 26.09.2010
comment
Эээ, разве вы не решили совершенно другую проблему, чем проблема OP? Функция data OP вызывала функцию block десять раз, и он хотел превратить это в поток из десяти элементов. Ваша data функция вызывает block только один раз. - person sepp2k; 26.09.2010
comment
@ sepp2k, эээ, да, конечно. Думаю, тогда необходимо продолжение. - person huynhjl; 26.09.2010
comment
Я попытался использовать код из этого потока stackoverflow.com/questions/2201882/, но безуспешно - person Dawid Grzesiak; 27.09.2010
comment
Да, я пробовал это раньше. К сожалению, это не решает проблему из-за ограничений CPS. См. Код gist.github.com/599575 Он возвращает ошибку: несоответствие типа; найдено: Unit @ scala.util.continuations.cpsParam [Unit, Unit] Требуется: Данные объекта {i = ›yld (i)} - person Dawid Grzesiak; 27.09.2010
comment
@Dawid См. Комментарий, который я добавил к этому фрагменту. - person Daniel C. Sobral; 28.09.2010

Вот реализация на основе продолжений с разделителями, адаптированная из предложения @Geoff Reedy:

import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}
person Tom Crockett    schedule 27.09.2010