поток данных вне игры довольно прост.
вот краткий пример того, как я собираюсь это сделать (пожалуйста, дайте мне знать, если я делаю это неправильно):
def getRandomStream = Action { implicit req =>
import scala.util.Random
import scala.concurrent.{blocking, ExecutionContext}
import ExecutionContext.Implicits.global
def getSomeRandomFutures: List[Future[String]] = {
for {
i <- (1 to 10).toList
r = Random.nextInt(30000)
} yield Future {
blocking {
Thread.sleep(r)
}
s"after $r ms. index: $i.\n"
}
}
val enumerator = Concurrent.unicast[Array[Byte]] {
(channel: Concurrent.Channel[Array[Byte]]) => {
getSomeRandomFutures.foreach {
_.onComplete {
case Success(x: String) => channel.push(x.getBytes("utf-8"))
case Failure(t) => channel.push(t.getMessage)
}
}
//following future will close the connection
Future {
blocking {
Thread.sleep(30000)
}
}.onComplete {
case Success(_) => channel.eofAndEnd()
case Failure(t) => channel.end(t)
}
}
}
new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}
теперь, если вас обслужит это действие, вы получите что-то вроде:
after 1757 ms. index: 10.
after 3772 ms. index: 3.
after 4282 ms. index: 6.
after 4788 ms. index: 8.
after 10842 ms. index: 7.
after 12225 ms. index: 4.
after 14085 ms. index: 9.
after 17110 ms. index: 1.
after 21213 ms. index: 2.
after 21516 ms. index: 5.
где каждая строка принимается после того, как прошло случайное время.
теперь представьте, что я хочу сохранить этот простой пример при потоковой передаче данных с сервера на клиент, но я также хочу поддерживать полную потоковую передачу данных от клиента к server.
Итак, скажем, я реализую новый BodyParser
, который анализирует ввод в List[Future[String]]
. это означает, что теперь мой Action
может выглядеть примерно так:
def getParsedStream = Action(myBodyParser) { implicit req =>
val xs: List[Future[String]] = req.body
val enumerator = Concurrent.unicast[Array[Byte]] {
(channel: Concurrent.Channel[Array[Byte]]) => {
xs.foreach {
_.onComplete {
case Success(x: String) => channel.push(x.getBytes("utf-8"))
case Failure(t) => channel.push(t.getMessage)
}
}
//again, following future will close the connection
Future.sequence(xs).onComplete {
case Success(_) => channel.eofAndEnd()
case Failure(t) => channel.end(t)
}
}
}
new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}
но это все еще не то, чего я хотел добиться. в этом случае я получу тело из запроса только после завершения запроса и загрузки всех данных на сервер. но я хочу начать обслуживать запросы по ходу дела. простой демонстрацией будет повторение любой полученной строки обратно пользователю, сохраняя при этом соединение.
Итак, вот мои текущие мысли:
что, если мой BodyParser
вернет Enumerator[String]
вместо List[Future[String]]
?
в этом случае я мог бы просто сделать следующее:
def getParsedStream = Action(myBodyParser) { implicit req =>
new Status(200).chunked(req.body).as("text/plain;charset=UTF-8")
}
так что теперь я столкнулся с проблемой, как реализовать такой BodyParser
. если быть более точным в отношении того, что именно мне нужно, ну:
мне нужно получить фрагменты данных для анализа в виде строки, где каждая строка заканчивается новой строкой \n
(хотя может содержать несколько строк...). каждый "кусок строк" будет обрабатываться некоторым (не относящимся к данному вопросу) вычислением, которое даст String
или, лучше, Future[String]
, поскольку это вычисление может занять некоторое время. результирующие строки этого вычисления должны быть отправлены пользователю по мере их готовности, так же, как случайный пример выше. и это должно происходить одновременно, пока отправляется больше данных.
Я изучил несколько ресурсов, пытаясь добиться этого, но пока безуспешно. например scalaQuery play iteratees -> кажется, что этот парень делает что-то похожее на то, что я хочу сделать, но я не смог перевести это в полезный пример. (и различия между play2.0 и play2.2 API не помогают...)
Итак, резюмируем: правильный ли это подход (учитывая, что я не хочу использовать WebSockets
) ? и если да, то как реализовать такой BodyParser
?
РЕДАКТИРОВАТЬ:
Я только что наткнулся на примечание в игровой документации относительно этой проблемы, в котором говорится:
Примечание. Такого же живого общения можно добиться и наоборот, используя бесконечный HTTP-запрос, обрабатываемый настраиваемым
BodyParser
, который получает фрагменты входных данных, но это гораздо сложнее.
так что я не сдаюсь, теперь, когда я точно знаю, что это достижимо.