Потоковая передача CSV-файла в браузер с помощью akka stream и spray

Как подключить Source[String, Unit] к потоковому актеру?

Я думаю, что это модифицированная версия StreamingActor из https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33. будет работать хорошо, но у меня возникли трудности с соединением частей.

Учитывая source: Source[String, Unit] и ctx: RequestContext, я думаю, модифицированный StreamingActor должен соединяться с actorRefFactory.actorOf(fromSource(source, ctx)).

Для справки, суть выше:

import akka.actor._
import akka.util.ByteString
import spray.http.HttpEntity.Empty
import spray.http.MediaTypes._
import spray.http._
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp}

object StreamingActor {

  // helper methods

  def fromString(iterable: Iterable[String], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromStringAndCharset(iterable: Iterable[String], ctx: RequestContext, charset: HttpCharset): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteArray(iterable: Iterable[Array[Byte]], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteString(iterable: Iterable[ByteString], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromHttpData(iterable: Iterable[HttpData], ctx: RequestContext): Props = {
    Props(new StreamingActor(iterable, ctx))
  }

  // initial message sent by StreamingActor to itself
  private case object FirstChunk

  // confirmation that given chunk was sent to client
  private case object ChunkAck

}

class StreamingActor(chunks: Iterable[HttpData], ctx: RequestContext) extends Actor with HttpService with ActorLogging {

  import StreamingActor._

  def actorRefFactory = context

  val chunkIterator: Iterator[HttpData] = chunks.iterator

  self ! FirstChunk

  def receive = {

    // send first chunk to client
    case FirstChunk if chunkIterator.hasNext =>
      val responseStart = HttpResponse(entity = HttpEntity(`text/html`, chunkIterator.next()))
      ctx.responder ! ChunkedResponseStart(responseStart).withAck(ChunkAck)

    // data stream is empty. Respond with Content-Length: 0 and stop
    case FirstChunk =>
      ctx.responder ! HttpResponse(entity = Empty)
      context.stop(self)

    // send next chunk to client  
    case ChunkAck if chunkIterator.hasNext =>
      val nextChunk = MessageChunk(chunkIterator.next())
      ctx.responder ! nextChunk.withAck(ChunkAck)

    // all chunks were sent. stop.  
    case ChunkAck =>
      ctx.responder ! ChunkedMessageEnd
      context.stop(self)

    //   
    case x => unhandled(x)
  }

}

person ic3b3rg    schedule 05.11.2015    source источник
comment
Требуется ли использование Актера для отправки нескольких значений HttpResponse? Я думаю, вам было бы гораздо лучше отправить 1 HttpResponse с HttpEntity.Chunked, см. stackoverflow.com/questions/33123280/   -  person Ramón J Romero y Vigil    schedule 05.11.2015


Ответы (1)


Я думаю, что ваше использование StreamingActor чрезмерно усложняет основную проблему, которую вы пытаетесь решить. Кроме того, StreamingActor в вопросе создаст несколько значений HttpResponse, по 1 для каждого фрагмента, для одного HttpRequest. Это неэффективно, потому что вы можете просто вернуть 1 HttpReponse с HttpEntity.Chunked в качестве Entity для вашего источника потока данных.

Общий дизайн параллелизма

Актеры для состояния, например. поддержание текущего счетчика между соединениями. И даже тогда Agent покрывает много вопросов с дополнительное преимущество проверки типов (в отличие от Actor.receive, который превращает почтовый ящик недоставленных писем в вашу единственную проверку типов во время выполнения).

Параллельное вычисление, а не состояние, должно обрабатываться (по порядку):

  1. Фьючерсы в первую очередь: компонуемые, безопасная проверка типов во время компиляции и лучший выбор для большинства случаев.

  2. akka Streams: компонуемый, безопасная проверка типа во время компиляции и очень полезный, но есть много накладные расходы в результате удобной функции обратного давления. Пары также являются способом формирования объектов HttpResponse, как показано ниже.

Потоковая передача CSV-файлов

Основной вопрос заключается в том, как передать CSV-файл на http-клиент с помощью Streams. Вы можете начать с создания источника данных и встраивания его в HttpResponse:

def lines() = scala.io.Source.fromFile("DataFile.csv").getLines()

import akka.util.ByteString
import akka.http.model.HttpEntity

def chunkSource : Source[HttpEntity.ChunkStreamPart, Unit] = 
  akka.stream.scaladsl.Source(lines)
                      .map(ByteString.apply)
                      .map(HttpEntity.ChunkStreamPart.apply)

def httpFileResponse = 
  HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, chunkSource))

Затем вы можете предоставить этот ответ для любых запросов:

val fileRequestHandler = {
  case HttpRequest(GET, Uri.Path("/csvFile"), _, _, _) => httpFileResponse
}   

Затем вставьте fileRequestHandler в логику маршрутизации вашего сервера.

person Ramón J Romero y Vigil    schedule 05.11.2015