Как начать работу с Akka Streams?

Библиотека Akka Streams уже поставляется с большим количеством богатой документации. . Однако главная проблема для меня в том, что он дает слишком много материала — я чувствую себя совершенно подавленным количеством понятий, которые мне нужно выучить. Многие примеры, показанные там, кажутся очень тяжеловесными и не могут быть легко переведены в реальные варианты использования и поэтому довольно эзотеричны. Я думаю, что это дает слишком много деталей, не объясняя, как построить все строительные блоки вместе и как именно это помогает решать конкретные проблемы.

Есть источники, приемники, потоки, этапы графа, частичные графы, материализация, DSL графа и многое другое, и я просто не знаю, с чего начать. краткое руководство. должно быть отправной точкой, но я этого не понимаю. Он просто добавляет концепции, упомянутые выше, не объясняя их. Кроме того, примеры кода не могут быть выполнены - в них отсутствуют части, что делает более или менее невозможным для меня следить за текстом.

Может ли кто-нибудь объяснить понятия источники, стоки, потоки, этапы графа, частичные графы, материализация и, возможно, некоторые другие вещи, которые я упустил, простыми словами и с простыми примерами, которые не объясняют каждую деталь (и которые, вероятно, все равно не нужны в начало)?


person kiritsuku    schedule 31.01.2016    source источник
comment
А как насчет scalaz-strams (fs2)? Очень практично, но почти без документов...   -  person dk14    schedule 05.02.2016
comment
Для информации это обсуждается в мета   -  person DavidG    schedule 09.02.2016
comment
Как первый человек, который проголосовал за закрытие этого вопроса (после метапотока), позвольте мне сначала сказать, что ваш ответ здесь великолепный. Это действительно всесторонний и, безусловно, очень полезный ресурс. Однако, к сожалению, вопрос, который вы задали, слишком широк для переполнения стека. Если каким-то образом ваш ответ можно опубликовать на вопрос с другой формулировкой, то это здорово, но я не думаю, что это возможно. Я настоятельно рекомендую повторно отправить это как сообщение в блоге или что-то подобное, которое вы и другие можете использовать в качестве справочного ресурса в будущих ответах.   -  person James Donnelly    schedule 09.02.2016
comment
Я думаю, что написание этого вопроса в виде сообщения в блоге было бы неэффективным. Да, это широкий вопрос - и это действительно хороший вопрос. Сужение области его применения не улучшит ее. Предоставленный ответ потрясающий. Я уверен, что Quora была бы счастлива отобрать у SO бизнес для больших вопросов.   -  person Mike Slinn    schedule 25.02.2016
comment
@MikeSlinn не пытайтесь обсуждать с людьми SO соответствующие вопросы, они слепо следуют правилам. Пока вопрос не снимается, я доволен и не хочу переходить на другую платформу.   -  person kiritsuku    schedule 25.02.2016
comment
@sschaef Как педантичен. Да, конечно, правила ничего не стоят, ваше великое «я» знает намного лучше, и все, кто пытается применять правила, просто слепо следуют обману. /разглагольствование. если серьезно, это было бы отличным дополнением к бета-версии документации, если вы в ней участвуете. Вы все еще можете подать заявку и разместить его там, но вы должны, по крайней мере, увидеть, что он не очень подходит для основного сайта.   -  person Félix Adriyel Gagnon-Grenier    schedule 26.05.2016
comment
Закрытие этих вопросов ... действительно странно, я думаю, эти правила связаны с тем, что ТАК хочет получить прибыль. Это не настоящее некоммерческое сообщество с демократическими лидерами.   -  person jhegedus    schedule 29.07.2017
comment
@jhegedus некоторое время назад произошла культурная битва из-за направления, которое выбрал SO. Если вы посмотрите, многие из нынешних лидеров — это люди, которые когда-то отвечали на тривиальные вопросы, но получили от них множество баллов. это немного грустно, правда.   -  person bharal    schedule 19.03.2018
comment
просто пройдите короткий курс Akka streams от Udemy (RockTheJvm). Инструктор Даниэль объясняет это подробно.   -  person Smalltalkguy    schedule 14.09.2020


Ответы (1)


Этот ответ основан на akka-stream версии 2.4.2. В других версиях API может немного отличаться. Зависимость может использоваться sbt:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"

Хорошо, давайте начнем. API Akka Streams состоит из трех основных типов. В отличие от реактивных потоков, таких типов много. мощнее и, следовательно, сложнее. Предполагается, что для всех примеров кода уже существуют следующие определения:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

Операторы import необходимы для объявлений типов. system представляет систему акторов Akka, а materializer представляет контекст оценки потока. В нашем случае мы используем ActorMaterializer, что означает, что потоки оцениваются поверх актеров. Оба значения помечены как implicit, что дает компилятору Scala возможность автоматически внедрять эти две зависимости всякий раз, когда они необходимы. Мы также импортируем system.dispatcher, который является контекстом выполнения для Futures.

Новый API

Akka Streams обладает следующими ключевыми свойствами:

  • Они реализуют спецификацию Reactive Streams, три основные цели которой обратное давление, асинхронные и неблокирующие границы, а также совместимость между различными реализациями полностью применимы и к Akka Streams.
  • Они обеспечивают абстракцию для механизма оценки потоков, который называется Materializer.
  • Программы формулируются как повторно используемые строительные блоки, которые представлены тремя основными типами Source, Sink и Flow. Строительные блоки образуют граф, оценка которого основана на Materializer и должна быть явно активирована.

Далее будет дано более глубокое введение в использование трех основных типов.

Источник

Source — это создатель данных, он служит источником ввода для потока. Каждый Source имеет один выходной канал и не имеет входного канала. Все данные проходят через выходной канал к тому, что подключено к Source.

Источник

Изображение взято с сайта boldradius.com.

Source можно создать несколькими способами:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

В приведенных выше случаях мы снабжали Source конечными данными, что означает, что они рано или поздно завершатся. Не следует забывать, что Reactive Streams по умолчанию ленивы и асинхронны. Это означает, что нужно явно запрашивать оценку потока. В Akka Streams это можно сделать с помощью методов run*. runForeach ничем не отличается от хорошо известной функции foreach — добавление run делает явным, что мы запрашиваем оценку потока. Поскольку конечные данные скучны, мы продолжаем с бесконечными:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

С помощью метода take мы можем создать искусственную точку остановки, которая не позволит нам бесконечно оценивать результаты. Так как поддержка актеров встроена, мы также можем легко наполнить поток сообщениями, отправленными актеру:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

Мы видим, что Futures выполняются асинхронно в разных потоках, что объясняет результат. В приведенном выше примере буфер для входящих элементов не требуется, и поэтому с помощью OverflowStrategy.fail мы можем настроить, что поток должен завершаться ошибкой при переполнении буфера. Благодаря этому интерфейсу актера мы можем передавать поток через любой источник данных. Неважно, создаются ли данные тем же потоком, другим потоком, другим процессом или поступают из удаленной системы через Интернет.

Раковина

Sink в основном является противоположностью Source. Это конечная точка потока и, следовательно, потребляет данные. Sink имеет один входной канал и не имеет выходного канала. Sinks особенно необходимы, когда мы хотим указать поведение сборщика данных повторно используемым способом и без оценки потока. Уже известные методы run* не позволяют нам использовать эти свойства, поэтому вместо них предпочтительнее использовать Sink.

Раковина

Изображение взято с сайта boldradius.com.

Краткий пример Sink в действии:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

Подключить Source к Sink можно с помощью метода to. Он возвращает так называемый RunnableFlow, который, как мы позже увидим, представляет собой особую форму Flow — поток, который можно выполнить, просто вызвав его метод run().

Runnable Flow

Изображение взято с сайта boldradius.com.

Конечно, можно пересылать все значения, поступающие в приемник, актору:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Поток

Источники данных и приемники хороши, если вам нужно соединение между потоками Akka и существующей системой, но с ними ничего нельзя сделать. Потоки — это последняя недостающая часть базовой абстракции Akka Streams. Они служат связующим звеном между различными потоками и могут использоваться для преобразования его элементов.

Flow

Изображение взято с сайта boldradius.com.

Если Flow соединяется с Source, результатом является новый Source. Аналогично, Flow, соединенный с Sink, создает новый Sink. А Flow, связанный как с Source, так и с Sink, дает RunnableFlow. Следовательно, они находятся между входным и выходным каналом, но сами по себе не соответствуют ни одному из вариантов, пока они не подключены ни к Source, ни к Sink.

Полный поток

Изображение взято с сайта boldradius.com.

Чтобы лучше понять Flows, мы рассмотрим несколько примеров:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

С помощью метода via мы можем соединить Source с Flow. Нам нужно указать тип ввода, потому что компилятор не может вывести его за нас. Как мы уже видим на этом простом примере, потоки invert и double полностью независимы от каких-либо производителей и потребителей данных. Они только преобразуют данные и передают их в выходной канал. Это означает, что мы можем повторно использовать поток среди нескольких потоков:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

s1 и s2 представляют совершенно новые потоки — они не передают никаких данных через свои строительные блоки.

Неограниченные потоки данных

Прежде чем мы двинемся дальше, мы должны сначала вернуться к некоторым ключевым аспектам Reactive Streams. Неограниченное количество элементов может прибыть в любую точку и поставить поток в разные состояния. Помимо обычного состояния потока, готового к выполнению, поток может быть остановлен либо из-за ошибки, либо из-за сигнала, указывающего на то, что дальнейшие данные не поступят. Поток можно смоделировать графически, отметив события на временной шкале, как здесь:

Показывает, что поток представляет собой последовательность текущих событий, упорядоченных во времени

Изображение взято из Введение в реактивное программирование, которое вы пропустили.

Мы уже видели исполняемые потоки в примерах из предыдущего раздела. Мы получаем RunnableGraph всякий раз, когда поток действительно может быть материализован, что означает, что Sink связано с Source. До сих пор мы всегда материализовались в значение Unit, которое можно увидеть в типах:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

Для Source и Sink параметр второго типа и для Flow параметр третьего типа обозначают материализованное значение. В этом ответе полное значение материализации не должно объясняться. Однако более подробную информацию о материализации можно найти на странице . официальная документация. На данный момент единственное, что нам нужно знать, это то, что материализованное значение — это то, что мы получаем, когда запускаем поток. Поскольку нас пока интересовали только побочные эффекты, мы получили Unit в качестве материализованного значения. Исключением была материализация раковины, результатом которой стало Future. Это вернуло нам Future, так как это значение может обозначать, когда поток, подключенный к приемнику, был завершен. До сих пор предыдущие примеры кода хорошо объясняли концепцию, но они также были скучными, потому что мы имели дело только с конечными потоками или с очень простыми бесконечными потоками. Чтобы сделать его более интересным, далее будет объяснен полный асинхронный и неограниченный поток.

Пример ClickStream

Например, нам нужен поток, который фиксирует события кликов. Чтобы усложнить задачу, предположим, что мы также хотим сгруппировать события кликов, которые происходят через короткое время друг за другом. Таким образом, мы могли легко обнаружить двойные, тройные или десятикратные клики. Кроме того, мы хотим отфильтровать все одиночные клики. Сделайте глубокий вдох и представьте, как бы вы решили эту проблему императивным способом. Бьюсь об заклад, никто не сможет реализовать решение, которое работает правильно с первой попытки. Реактивным способом решить эту проблему несложно. На самом деле решение настолько простое и понятное в реализации, что мы даже можем выразить его на диаграмме, непосредственно описывающей поведение кода:

Пример логики потока кликов

Изображение взято из Введение в реактивное программирование, которое вы пропустили.

Серые прямоугольники — это функции, описывающие преобразование одного потока в другой. С функцией throttle мы накапливаем клики в течение 250 миллисекунд, функции map и filter говорят сами за себя. Цветные сферы представляют события, а стрелки показывают, как они проходят через наши функции. Позже на этапах обработки мы получаем все меньше и меньше элементов, проходящих через наш поток, поскольку мы группируем их вместе и отфильтровываем. Код для этого изображения будет выглядеть примерно так:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

Всю логику можно представить всего в четырех строчках кода! В Scala мы могли бы написать это еще короче:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

Определение clickStream немного сложнее, но это только тот случай, потому что пример программы работает на JVM, где захват событий кликов невозможен. Еще одна сложность заключается в том, что Akka по умолчанию не предоставляет функцию throttle. Вместо этого мы должны были написать это сами. Поскольку эта функция (как и в случае с функциями map или filter) может использоваться повторно в различных вариантах использования, я не считаю эти строки количеством строк, необходимых для реализации логики. Однако в императивных языках нормально, что логику нельзя так просто повторно использовать, и что различные логические шаги происходят в одном месте, а не применяются последовательно, что означает, что мы, вероятно, неправильно сформировали бы наш код с логикой регулирования. Полный пример кода доступен в виде gist и не будет обсуждаться здесь. дальше.

Пример SimpleWebServer

Вместо этого следует обсудить другой пример. Хотя поток кликов является хорошим примером, позволяющим Akka Streams обрабатывать пример реального мира, ему не хватает мощности для демонстрации параллельного выполнения в действии. Следующий пример должен представлять небольшой веб-сервер, который может обрабатывать несколько запросов параллельно. Веб-сервер должен иметь возможность принимать входящие соединения и получать от них последовательности байтов, которые представляют собой печатные символы ASCII. Эти последовательности байтов или строки должны быть разбиты на все символы новой строки на более мелкие части. После этого сервер должен ответить клиенту каждой из разделенных строк. В качестве альтернативы он мог бы сделать что-то еще со строками и выдать специальный токен ответа, но мы хотим, чтобы в этом примере все было просто, и поэтому не нужно вводить какие-либо причудливые функции. Помните, что сервер должен иметь возможность обрабатывать несколько запросов одновременно, что в основном означает, что ни один запрос не может блокировать любой другой запрос от дальнейшего выполнения. Решить все эти требования императивным способом может быть сложно — однако с Akka Streams нам не нужно больше нескольких строк, чтобы решить любое из них. Во-первых, давайте рассмотрим сам сервер:

сервер

По сути, есть только три основных строительных блока. Первый должен принимать входящие соединения. Второй должен обрабатывать входящие запросы, а третий должен отправлять ответ. Реализация всех этих трех строительных блоков лишь немного сложнее, чем реализация потока кликов:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

Функция mkServer принимает (помимо адреса и порта сервера) еще акторную систему и материализатор в качестве неявных параметров. Поток управления сервером представлен binding, который берет источник входящих соединений и перенаправляет их в приемник входящих соединений. Внутри connectionHandler, который является нашим приемником, мы обрабатываем каждое соединение потоком serverLogic, который будет описан позже. binding возвращает Future, что завершается, когда сервер был запущен или запуск завершился неудачно, что может быть в том случае, когда порт уже занят другим процессом. Однако код не полностью отражает графику, поскольку мы не видим строительного блока, который обрабатывает ответы. Причина этого в том, что соединение уже само обеспечивает эту логику. Это двунаправленный поток, а не только однонаправленный, как потоки, которые мы видели в предыдущих примерах. Как и в случае с материализацией, такие сложные потоки здесь объясняться не будут. официальная документация содержит множество материалов. для покрытия более сложных графов потоков. Пока достаточно знать, что Tcp.IncomingConnection представляет собой соединение, которое знает, как получать запросы и как отправлять ответы. Часть, которая все еще отсутствует, — это строительный блок serverLogic. Это может выглядеть так:

логика сервера

Опять же, мы можем разделить логику на несколько простых строительных блоков, которые вместе образуют поток нашей программы. Сначала мы хотим разделить нашу последовательность байтов на строки, что мы должны делать всякий раз, когда находим символ новой строки. После этого байты каждой строки необходимо преобразовать в строку, потому что работа с необработанными байтами громоздка. В целом мы могли бы получить бинарный поток сложного протокола, что сделало бы работу с входящими необработанными данными чрезвычайно сложной. Когда у нас есть читаемая строка, мы можем создать ответ. Для простоты в нашем случае ответ может быть любым. В конце концов, нам нужно преобразовать наш ответ в последовательность байтов, которую можно отправить по сети. Код всей логики может выглядеть так:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

Мы уже знаем, что serverLogic — это поток, который принимает ByteString и должен произвести ByteString. С помощью delimiter мы можем разделить ByteString на более мелкие части — в нашем случае это должно происходить всякий раз, когда появляется символ новой строки. receiver — это поток, который берет все разделенные последовательности байтов и преобразует их в строку. Это, конечно, опасное преобразование, так как только печатные символы ASCII должны быть преобразованы в строку, но для наших нужд этого достаточно. responder — это последний компонент, отвечающий за создание ответа и обратное преобразование ответа в последовательность байтов. В отличие от графики мы не делили этот последний компонент на два, так как логика тривиальна. В конце соединяем все потоки через функцию via. Здесь можно спросить, позаботились ли мы о многопользовательском свойстве, о котором упоминалось в начале. И действительно, мы это сделали, хотя это может быть неочевидно сразу. Глядя на этот рисунок, должно стать понятнее:

объединение серверной и серверной логики

Компонент serverLogic — это не что иное, как поток, содержащий более мелкие потоки. Этот компонент принимает входные данные, являющиеся запросом, и производит выходные данные, являющиеся ответом. Поскольку потоки могут создаваться несколько раз, и все они работают независимо друг от друга, мы достигаем за счет этого вложения нашего многопользовательского свойства. Каждый запрос обрабатывается в своем собственном запросе, поэтому короткий запрос может превысить ранее запущенный длительный запрос. Если вам интересно, определение serverLogic, показанное ранее, конечно, можно написать намного короче, встроив большинство его внутренних определений:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

Тест веб-сервера может выглядеть так:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

Чтобы приведенный выше пример кода работал правильно, нам сначала нужно запустить сервер, который изображен скриптом startServer:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

Полный пример кода этого простого TCP-сервера можно найти здесь. Мы можем написать не только сервер с Akka Streams, но и клиент. Это может выглядеть так:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

Полный код клиента TCP можно найти здесь. Код выглядит очень похоже, но в отличие от сервера нам больше не нужно управлять входящими соединениями.

Сложные графики

В предыдущих разделах мы видели, как мы можем создавать простые программы из потоков. Однако в действительности зачастую недостаточно просто полагаться на уже встроенные функции для построения более сложных потоков. Если мы хотим иметь возможность использовать Akka Streams для произвольных программ, нам нужно знать, как создавать собственные настраиваемые структуры управления и комбинируемые потоки, которые позволяют нам справляться со сложностью наших приложений. Хорошей новостью является то, что Akka Streams была разработана с учетом потребностей пользователей, и чтобы дать вам краткое введение в более сложные части Akka Streams, мы добавили несколько дополнительных функций в наш пример клиент/сервер.

Одна вещь, которую мы пока не можем сделать, это закрыть соединение. На этом этапе все становится немного сложнее, потому что потоковый API, который мы видели до сих пор, не позволяет нам останавливать поток в произвольной точке. Однако существует абстракция GraphStage, которую можно использовать для создания произвольных этапов обработки графа с любым количеством входных или выходных портов. Давайте сначала посмотрим на серверную часть, где мы представляем новый компонент, названный closeConnection:

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ⇒
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

Этот API выглядит гораздо более громоздким, чем потоковый API. Неудивительно, здесь мы должны сделать много императивных шагов. Взамен мы получаем больший контроль над поведением наших потоков. В приведенном выше примере мы указываем только один входной и один выходной порт и делаем их доступными для системы, переопределяя значение shape. Кроме того, мы определили так называемые InHandler и OutHandler, которые в указанном порядке отвечают за прием и передачу элементов. Если вы внимательно изучили полный пример потока кликов, вы должны уже распознать эти компоненты. В InHandler мы захватываем элемент, и если это строка с одним символом 'q', мы хотим закрыть поток. Чтобы дать клиенту возможность узнать, что поток скоро закроется, мы выводим строку "BYE", после чего сразу же закрываем этап. Компонент closeConnection можно объединить с потоком с помощью метода via, который был представлен в разделе о потоках.

Помимо возможности закрывать соединения, было бы неплохо, если бы мы могли показывать приветственное сообщение вновь созданному соединению. Чтобы сделать это, мы снова должны пойти немного дальше:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

Теперь функция serverLogic принимает входящее соединение в качестве параметра. Внутри его тела мы используем DSL, который позволяет нам описать сложное поведение потока. С помощью welcome мы создаем поток, который может генерировать только один элемент — приветственное сообщение. logic — это то, что было описано как serverLogic в предыдущем разделе. Единственное заметное отличие состоит в том, что мы добавили к нему closeConnection. Теперь начинается самое интересное, связанное с DSL. Функция GraphDSL.create делает доступным конструктор b, который используется для представления потока в виде графа. С помощью функции ~> можно соединить входные и выходные порты друг с другом. Компонент Concat, который используется в примере, может объединять элементы и здесь используется для добавления приветственного сообщения перед другими элементами, исходящими из internalLogic. В последней строке мы делаем доступными только входной порт серверной логики и выходной порт объединенного потока, потому что все остальные порты должны оставаться деталями реализации компонента serverLogic. Для более подробного ознакомления с DSL графа Akka Streams посетите соответствующий раздел в официальная документация. Полный пример кода сложного TCP-сервера и клиента, который может с ним взаимодействовать, можно найти здесь. Всякий раз, когда вы открываете новое соединение с клиента, вы должны увидеть приветственное сообщение, и, набрав "q" на клиенте, вы должны увидеть сообщение о том, что соединение было отменено.

Есть еще некоторые темы, которые не были затронуты в этом ответе. В частности, материализация может напугать того или иного читателя, но я уверен, что с материалом, изложенным здесь, каждый сможет сделать следующие шаги самостоятельно. Как уже было сказано, официальная документация это хорошее место для продолжения изучения Akka Streams.

person kiritsuku    schedule 31.01.2016
comment
@monksy Я не планировал публиковать это где-либо еще. Не стесняйтесь опубликовать это в своем блоге, если хотите. API в настоящее время стабилен в большинстве частей, что означает, что вам, вероятно, даже не нужно заботиться об обслуживании (большинство статей в блогах об Akka Streams устарели, поскольку они показывают API, которого больше не существует). - person kiritsuku; 04.02.2016
comment
Вы можете связаться со мной по адресу shicks stevenkhicks.de, и мы все обсудим? Я не хочу воспроизводить его и искажать его. (Меня больше всего беспокоит, что этот ответ исчезнет) - person monksy; 04.02.2016
comment
Это не исчезнет. Почему? - person kiritsuku; 04.02.2016
comment
@sschaef Это вполне может исчезнуть, поскольку вопрос не по теме и как таковой закрыт. - person DavidG; 09.02.2016
comment
@sschaef Вопрос не по теме. Был закрыт и признан таковым. Вопросы не по теме нужно удалять. Этот вопрос скорее всего утонет и ответ вместе с ним. - person magisch; 09.02.2016
comment
@Magisch Всегда помните: мы не удаляем хороший контент. Я не совсем уверен, но я думаю, что этот ответ действительно может подойти, несмотря ни на что. - person Deduplicator; 09.02.2016
comment
@Deduplicator Я думаю, что это не по теме. Он может быть хорош по другим качествам, но здесь он хорош. Если бы это было опубликовано сегодня, я бы проголосовал за него. - person magisch; 09.02.2016
comment
Этот пост может быть полезен для новой функции документации Stack Overflow — как только она откроется для Scala. - person S.L. Barth; 09.02.2016
comment
Это один из самых полезных ответов о переполнении стека, которые я когда-либо видел. Хотелось бы, чтобы это было в документации Akka. @sshaef, огромное спасибо! - person Louis Cruz; 05.03.2018
comment
Почему один голос за закрытие отменяет 250 голосов за хороший вопрос и 500 голосов за хороший ответ на StackOverflow? Система сломана. - person Roger C S Wernersson; 04.02.2021