Потоковая передача данных одновременно по одному HTTP-соединению в игре.

поток данных вне игры довольно прост.
вот краткий пример того, как я собираюсь это сделать (пожалуйста, дайте мне знать, если я делаю это неправильно):

def getRandomStream = Action { implicit req =>

  import scala.util.Random
  import scala.concurrent.{blocking, ExecutionContext}
  import ExecutionContext.Implicits.global

  def getSomeRandomFutures: List[Future[String]] = {
    for {
      i <- (1 to 10).toList
      r = Random.nextInt(30000)
    } yield Future {
      blocking {
        Thread.sleep(r)
      }
      s"after $r ms. index: $i.\n"
    }
  }

  val enumerator = Concurrent.unicast[Array[Byte]] {
    (channel: Concurrent.Channel[Array[Byte]]) => {
      getSomeRandomFutures.foreach {
        _.onComplete {
          case Success(x: String) => channel.push(x.getBytes("utf-8"))
          case Failure(t) => channel.push(t.getMessage)
        }
      }
      //following future will close the connection
      Future {
        blocking {
          Thread.sleep(30000)
        }
      }.onComplete {
        case Success(_) => channel.eofAndEnd()
        case Failure(t) => channel.end(t)
      }
    }
  }
  new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}

теперь, если вас обслужит это действие, вы получите что-то вроде:

after 1757 ms. index: 10.
after 3772 ms. index: 3.
after 4282 ms. index: 6.
after 4788 ms. index: 8.
after 10842 ms. index: 7.
after 12225 ms. index: 4.
after 14085 ms. index: 9.
after 17110 ms. index: 1.
after 21213 ms. index: 2.
after 21516 ms. index: 5.

где каждая строка принимается после того, как прошло случайное время.
теперь представьте, что я хочу сохранить этот простой пример при потоковой передаче данных с сервера на клиент, но я также хочу поддерживать полную потоковую передачу данных от клиента к server.

Итак, скажем, я реализую новый BodyParser, который анализирует ввод в List[Future[String]]. это означает, что теперь мой Action может выглядеть примерно так:

def getParsedStream = Action(myBodyParser) { implicit req =>

  val xs: List[Future[String]] = req.body

  val enumerator = Concurrent.unicast[Array[Byte]] {
    (channel: Concurrent.Channel[Array[Byte]]) => {
      xs.foreach {
        _.onComplete {
          case Success(x: String) => channel.push(x.getBytes("utf-8"))
          case Failure(t) => channel.push(t.getMessage)
        }
      }
      //again, following future will close the connection
      Future.sequence(xs).onComplete {
        case Success(_) => channel.eofAndEnd()
        case Failure(t) => channel.end(t)
      }
    }
  }
  new Status(200).chunked(enumerator).as("text/plain;charset=UTF-8")
}

но это все еще не то, чего я хотел добиться. в этом случае я получу тело из запроса только после завершения запроса и загрузки всех данных на сервер. но я хочу начать обслуживать запросы по ходу дела. простой демонстрацией будет повторение любой полученной строки обратно пользователю, сохраняя при этом соединение.

Итак, вот мои текущие мысли:
что, если мой BodyParser вернет Enumerator[String] вместо List[Future[String]]?
в этом случае я мог бы просто сделать следующее:

def getParsedStream = Action(myBodyParser) { implicit req =>
  new Status(200).chunked(req.body).as("text/plain;charset=UTF-8")
}

так что теперь я столкнулся с проблемой, как реализовать такой BodyParser. если быть более точным в отношении того, что именно мне нужно, ну:
мне нужно получить фрагменты данных для анализа в виде строки, где каждая строка заканчивается новой строкой \n (хотя может содержать несколько строк...). каждый "кусок строк" будет обрабатываться некоторым (не относящимся к данному вопросу) вычислением, которое даст String или, лучше, Future[String], поскольку это вычисление может занять некоторое время. результирующие строки этого вычисления должны быть отправлены пользователю по мере их готовности, так же, как случайный пример выше. и это должно происходить одновременно, пока отправляется больше данных.

Я изучил несколько ресурсов, пытаясь добиться этого, но пока безуспешно. например scalaQuery play iteratees -> кажется, что этот парень делает что-то похожее на то, что я хочу сделать, но я не смог перевести это в полезный пример. (и различия между play2.0 и play2.2 API не помогают...)

Итак, резюмируем: правильный ли это подход (учитывая, что я не хочу использовать WebSockets) ? и если да, то как реализовать такой BodyParser?

РЕДАКТИРОВАТЬ:

Я только что наткнулся на примечание в игровой документации относительно этой проблемы, в котором говорится:

Примечание. Такого же живого общения можно добиться и наоборот, используя бесконечный HTTP-запрос, обрабатываемый настраиваемым BodyParser, который получает фрагменты входных данных, но это гораздо сложнее.

так что я не сдаюсь, теперь, когда я точно знаю, что это достижимо.


person gilad hoch    schedule 11.12.2013    source источник
comment
Я не уверен, что HTTP даже поддерживает то, что вы пытаетесь сделать, и не похоже, что это стоило бы делать - все, что я прочитал, предполагает, что браузеры не будут обрабатывать ответ до тех пор, пока они не закончат загрузку полного запроса. Почему бы не разбить это на отдельные запросы с подключением Keep-Alive? Или используйте два отдельных соединения, например. запрос AJAX, загружающий данные, и вечный iframe JSONP, выводящий данные?   -  person wingedsubmariner    schedule 11.12.2013
comment
в моем случае клиент (обычно) не является веб-браузером, поэтому мне все равно, поддерживают ли его браузеры или нет. и вообще, я говорю о потоковой передаче больших объемов данных, которые можно обрабатывать по ходу дела. я бы никогда не хотел, чтобы все данные хранились ни в памяти моего сервера, ни на диске. и из-за характера моего сервиса важно сделать это как один HTTP-запрос.   -  person gilad hoch    schedule 11.12.2013
comment
Это похоже на проблему XY, что вы действительно пытаетесь сделать?   -  person wingedsubmariner    schedule 11.12.2013
comment
я опишу одну особенность: клиентский ввод - это URL-адреса темы. каждая строка содержит 1 URL. вывод сервера - N-тройные строки. каждый URL-адрес, отправляемый клиентом, сопоставляется с несколькими n-тройками, описывающими тему. ввод и вывод могут содержать миллионы строк. есть и другие причины, по которым это должно быть одно http-соединение. (например, другие службы уже используют его, и я не могу сломать API...)   -  person gilad hoch    schedule 11.12.2013
comment
Вы можете сохранить одно соединение и использовать несколько запросов. Это действительно единственный способ, который поддерживает HTTP, и будет гораздо лучше сделать это, чем пытаться взломать свои собственные расширения для HTTP. Например, вы можете REST полностью выдавать ошибки для каждого запроса отдельно. Вы можете использовать одно соединение и не только обрабатывать эти запросы, но и смешивать другие, которые вы упомянули, которые поддерживает ваш API. С HTTP 2.0 эти запросы могут обслуживаться одновременно и отвечать на них не по порядку. Ваш хак с фрагментированным запросом и ответом не может сделать ничего из этого.   -  person wingedsubmariner    schedule 11.12.2013


Ответы (2)


То, что вы хотите сделать, вполне невозможно в Google Play.

Проблема в том, что Play не может начать отправку ответа, пока полностью не получит запрос. Таким образом, вы можете либо получить запрос полностью, а затем отправить ответ, как вы это делали, или вы можете обрабатывать запросы по мере их получения (в пользовательском BodyParser), но вы по-прежнему можете " не отвечайте до тех пор, пока не получите запрос полностью (именно на это намекала примечание в документации, хотя вы можете отправить ответ в другом соединении).

Чтобы понять почему, обратите внимание, что Action по сути является (RequestHeader) => Iteratee[Array[Byte], SimpleResult]. В любой момент Iteratee находится в одном из трех состояний — Done, Cont или Error. Он может принимать больше данных, только если находится в состоянии Cont, но может возвращать значение только в состоянии Done. Поскольку это возвращаемое значение равно SimpleResult (т. е. наш ответ), это означает, что между получением данных и их отправкой существует жесткое отключение.

Согласно этому ответу, стандарт HTTP позволяет ответ до того, как запрос будет завершен, но большинство браузеров не соблюдают спецификацию, и в любом случае Play не поддерживает ее, как объяснялось выше.

Самый простой способ реализовать полнодуплексную связь в Play — использовать веб-сокеты, но мы это исключили. Если использование ресурсов сервера является основной причиной изменения, вы можете попробовать проанализировать свои данные с помощью play.api.mvc.BodyParsers.parse.temporaryFile, который сохранит данные во временный файл, или play.api.mvc.BodyParsers.parse.rawBuffer, который переполнит временный файл, если запрос слишком велик.

В противном случае я не вижу разумного способа сделать это с помощью Play, поэтому вы можете рассмотреть возможность использования другого веб-сервера.

person James_pic    schedule 31.01.2014
comment
... стандарт HTTP разрешает ответ до завершения запроса, но большинство браузеров не соблюдают спецификацию... поскольку в моем случае клиент не является браузером, это не проблема . и в любом случае Play его не поддерживает. - вы уверены? Я ничего не мог найти по этому поводу. в любом случае, о чем я думал, это не позволить Iteratee достичь состояния Done перед ответом и каким-то образом использовать соединение, чтобы начать потоковую передачу данных обратно во время их анализа. если это невозможно, я был бы признателен за ссылку. Благодарю. - person gilad hoch; 02.02.2014
comment
Причина, по которой это невозможно, заключается в том, что ответ создается Iteratee, когда он находится в состоянии Done. Если вы посмотрите на API для Iteratee, вы не сможете получить от него результат, когда он находится в любом другом состоянии. Состояния представлены тремя классами case: Cont, Done и Err, и только Done включает результат. Это единственный и единственный способ получить ответ. - person James_pic; 02.02.2014
comment
Более того, если вы посмотрите на реализацию веб-сервера Play (в классах play.core), вы увидите, что он прекращает обработку данных, когда Iteratee находится в состоянии Done, поэтому, если вы думали как-то обмануть его (например, переопределение fold в вашей итерации, чтобы вызвать обратные вызовы Done и Cont), это тоже не сработает. - person James_pic; 02.02.2014
comment
поскольку я хочу сделать что-то неортодоксальное, я не против иметь хакерское решение, а не идти стандартным путем. если есть возможность как-то подключиться в BodyParser и начать присылать ответы по ходу дела, я соглашусь. другим хакерским вариантом было бы каким-то образом оставить Iteratee в режиме Cont, чтобы он по-прежнему потреблял входящие данные, но с готовностью вызывал действие с Enumerator, испускающим проанализированные фрагменты, которые доступны в качестве тела запроса. - person gilad hoch; 03.02.2014
comment
Фундаментальная проблема заключается в том, что единственный способ, которым вы можете передать Enumerator в Play, — это заставить ваш Iteratee вернуть SimpleResult, и единственный способ, которым это может произойти, — это если ваш Iteratee находится в состоянии Done. Я рассматривал хакерский вариант, подобный тому, который вы предлагаете: Iteratee сигнализирует о своем состоянии, вызывая один из трех обратных вызовов, поэтому я задался вопросом, можно ли разорвать контракт Iteratee, вызвав оба обратных вызова Done и Cont. К сожалению, это тоже не сработает по причинам, изложенным в моем предыдущем комментарии. - person James_pic; 04.02.2014
comment
Строки 65-72 из github.com/playframework/playframework/blob/ демонстрирует, что Play перестанет помещать данные в Iteratee, как только у него появится Result, даже если вы каким-то образом обманом заставили его вернуть значение из состояния Cont. Если вы хотите попытаться обойти это ограничение, вам, как минимум, придется запустить модифицированную версию Play, которая этого не делает. - person James_pic; 04.02.2014
comment
Хорошо, я думаю, это действительно невозможно. тем не менее, странно, что они написали заметку о том, что это возможно в документации по игре. в любом случае, я не получил то, что хотел, но это пустая трата времени, если никто не получит награду... ;) - person gilad hoch; 05.02.2014
comment
Примечание в документах верное, оно просто не подходит для вашего варианта использования. IIRC, примечание находится в разделе о комете. Comet позволяет серверу отправлять клиенту поток сообщений. Наоборот, клиент отправляет поток сообщений на сервер, что также возможно при творческом использовании BodyParsers. Что невозможно в Play, так это то, что обе эти вещи происходят одновременно (за исключением открытия двух соединений, по одному для каждого направления). - person James_pic; 05.02.2014

«Потоковая передача данных одновременно по одному HTTP-соединению в игре»

Я не закончил читать ни весь ваш вопрос, ни код, но то, что вы просите сделать, недоступно в HTTP. Это не имеет ничего общего с Play.

Когда вы делаете веб-запрос, вы открываете сокет на веб-сервере и отправляете «GET /file.html HTTP/1.1\n[необязательные заголовки]\n[дополнительные заголовки]\n\n»

Вы получаете ответ после (и только после) выполнения запроса (необязательно включая тело запроса как часть запроса). Когда и только когда запрос и завершен, в HTTP 1.1 (но не 1.0) вы можете сделать новый запрос в том же сокете (в http 1.0 вы открываете новый сокет).

Возможно на ответ "зависнуть"... так работают веб-чаты. Сервер просто сидит там, висит на открытом сокете, не отправляя ответ, пока кто-то не отправит вам сообщение. Постоянное соединение с веб-сервером в конечном итоге обеспечивает ответ, когда/если вы получаете сообщение чата.

Точно так же запрос может «зависнуть». Вы можете начать отправлять данные вашего запроса на сервер, немного подождать, а затем завершить запрос, когда вы получите дополнительный пользовательский ввод. Этот механизм обеспечивает лучшую производительность, чем постоянное создание новых HTTP-запросов для каждого пользовательского ввода. Сервер может интерпретировать этот поток данных как поток отдельных входных данных, даже если это не обязательно было первоначальным намерением спецификации HTTP.

HTTP не поддерживает механизм получения части запроса, отправки части ответа и получения большей части запроса. Его просто нет в спецификации. Как только вы начали получать ответ, единственный способ отправить дополнительную информацию на сервер — это использовать другой HTTP-запрос. Вы можете использовать уже открытый параллельно, или вы можете открыть новый, или вы можете выполнить первый запрос/ответ и выдать дополнительный запрос на том же сокете (в 1.1).

Если вам необходимо иметь асинхронный ввод-вывод в одном сокетном соединении, вы можете рассмотреть другой протокол, отличный от HTTP.

person nairbv    schedule 31.01.2014
comment
Технически вы можете отправить другой запрос до получения ответа, как при конвейерной обработке запросов. - person James_pic; 02.02.2014
comment
Интересно, я не знал о конвейерной обработке запросов. Согласно википедии, во всех других браузерах [кроме Opera] конвейерная обработка HTTP отключена или не реализована. Также по-прежнему не похоже, что это активирует запрошенную функциональность. ограничение HTTP 1.1 все еще применяется: сервер должен отправлять свои ответы в том же порядке, в котором были получены запросы ... соединение остается в порядке поступления. ОП в этом случае я получу тело из запроса только после того, как запрос будет завершен, и все данные будут загружены на сервер. но я хочу начать обслуживать запросы по ходу дела. - person nairbv; 04.02.2014
comment
Да, конвейеризация запросов в значительной степени не связана с тем, что пытается сделать OP. Конвейерная обработка запросов является своего рода противоположностью этому, поскольку она позволяет клиенту отправить запрос до получения ответа, а не серверу, отправляющему ответ до получения полного запроса. - person James_pic; 04.02.2014