Этот ответ основан на akka-stream
версии 2.4.2
. В других версиях API может немного отличаться. Зависимость может использоваться sbt:
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"
Хорошо, давайте начнем. API Akka Streams состоит из трех основных типов. В отличие от реактивных потоков, таких типов много. мощнее и, следовательно, сложнее. Предполагается, что для всех примеров кода уже существуют следующие определения:
import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
Операторы import
необходимы для объявлений типов. system
представляет систему акторов Akka, а materializer
представляет контекст оценки потока. В нашем случае мы используем ActorMaterializer
, что означает, что потоки оцениваются поверх актеров. Оба значения помечены как implicit
, что дает компилятору Scala возможность автоматически внедрять эти две зависимости всякий раз, когда они необходимы. Мы также импортируем system.dispatcher
, который является контекстом выполнения для Futures
.
Новый API
Akka Streams обладает следующими ключевыми свойствами:
- Они реализуют спецификацию Reactive Streams, три основные цели которой обратное давление, асинхронные и неблокирующие границы, а также совместимость между различными реализациями полностью применимы и к Akka Streams.
- Они обеспечивают абстракцию для механизма оценки потоков, который называется
Materializer
.
- Программы формулируются как повторно используемые строительные блоки, которые представлены тремя основными типами
Source
, Sink
и Flow
. Строительные блоки образуют граф, оценка которого основана на Materializer
и должна быть явно активирована.
Далее будет дано более глубокое введение в использование трех основных типов.
Источник
Source
— это создатель данных, он служит источником ввода для потока. Каждый Source
имеет один выходной канал и не имеет входного канала. Все данные проходят через выходной канал к тому, что подключено к Source
.
Изображение взято с сайта boldradius.com.
Source
можно создать несколькими способами:
scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...
scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future
В приведенных выше случаях мы снабжали Source
конечными данными, что означает, что они рано или поздно завершатся. Не следует забывать, что Reactive Streams по умолчанию ленивы и асинхронны. Это означает, что нужно явно запрашивать оценку потока. В Akka Streams это можно сделать с помощью методов run*
. runForeach
ничем не отличается от хорошо известной функции foreach
— добавление run
делает явным, что мы запрашиваем оценку потока. Поскольку конечные данные скучны, мы продолжаем с бесконечными:
scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5
С помощью метода take
мы можем создать искусственную точку остановки, которая не позволит нам бесконечно оценивать результаты. Так как поддержка актеров встроена, мы также можем легко наполнить поток сообщениями, отправленными актеру:
def run(actor: ActorRef) = {
Future { Thread.sleep(300); actor ! 1 }
Future { Thread.sleep(200); actor ! 2 }
Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
.actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
.mapMaterializedValue(run)
scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1
Мы видим, что Futures
выполняются асинхронно в разных потоках, что объясняет результат. В приведенном выше примере буфер для входящих элементов не требуется, и поэтому с помощью OverflowStrategy.fail
мы можем настроить, что поток должен завершаться ошибкой при переполнении буфера. Благодаря этому интерфейсу актера мы можем передавать поток через любой источник данных. Неважно, создаются ли данные тем же потоком, другим потоком, другим процессом или поступают из удаленной системы через Интернет.
Раковина
Sink
в основном является противоположностью Source
. Это конечная точка потока и, следовательно, потребляет данные. Sink
имеет один входной канал и не имеет выходного канала. Sinks
особенно необходимы, когда мы хотим указать поведение сборщика данных повторно используемым способом и без оценки потока. Уже известные методы run*
не позволяют нам использовать эти свойства, поэтому вместо них предпочтительнее использовать Sink
.
Изображение взято с сайта boldradius.com.
Краткий пример Sink
в действии:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3
Подключить Source
к Sink
можно с помощью метода to
. Он возвращает так называемый RunnableFlow
, который, как мы позже увидим, представляет собой особую форму Flow
— поток, который можно выполнить, просто вызвав его метод run()
.
Изображение взято с сайта boldradius.com.
Конечно, можно пересылать все значения, поступающие в приемник, актору:
val actor = system.actorOf(Props(new Actor {
override def receive = {
case msg => println(s"actor received: $msg")
}
}))
scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...
scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed
Поток
Источники данных и приемники хороши, если вам нужно соединение между потоками Akka и существующей системой, но с ними ничего нельзя сделать. Потоки — это последняя недостающая часть базовой абстракции Akka Streams. Они служат связующим звеном между различными потоками и могут использоваться для преобразования его элементов.
Изображение взято с сайта boldradius.com.
Если Flow
соединяется с Source
, результатом является новый Source
. Аналогично, Flow
, соединенный с Sink
, создает новый Sink
. А Flow
, связанный как с Source
, так и с Sink
, дает RunnableFlow
. Следовательно, они находятся между входным и выходным каналом, но сами по себе не соответствуют ни одному из вариантов, пока они не подключены ни к Source
, ни к Sink
.
Изображение взято с сайта boldradius.com.
Чтобы лучше понять Flows
, мы рассмотрим несколько примеров:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6
С помощью метода via
мы можем соединить Source
с Flow
. Нам нужно указать тип ввода, потому что компилятор не может вывести его за нас. Как мы уже видим на этом простом примере, потоки invert
и double
полностью независимы от каких-либо производителей и потребителей данных. Они только преобразуют данные и передают их в выходной канал. Это означает, что мы можем повторно использовать поток среди нескольких потоков:
scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3
scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1
s1
и s2
представляют совершенно новые потоки — они не передают никаких данных через свои строительные блоки.
Неограниченные потоки данных
Прежде чем мы двинемся дальше, мы должны сначала вернуться к некоторым ключевым аспектам Reactive Streams. Неограниченное количество элементов может прибыть в любую точку и поставить поток в разные состояния. Помимо обычного состояния потока, готового к выполнению, поток может быть остановлен либо из-за ошибки, либо из-за сигнала, указывающего на то, что дальнейшие данные не поступят. Поток можно смоделировать графически, отметив события на временной шкале, как здесь:
Изображение взято из Введение в реактивное программирование, которое вы пропустили.
Мы уже видели исполняемые потоки в примерах из предыдущего раздела. Мы получаем RunnableGraph
всякий раз, когда поток действительно может быть материализован, что означает, что Sink
связано с Source
. До сих пор мы всегда материализовались в значение Unit
, которое можно увидеть в типах:
val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)
Для Source
и Sink
параметр второго типа и для Flow
параметр третьего типа обозначают материализованное значение. В этом ответе полное значение материализации не должно объясняться. Однако более подробную информацию о материализации можно найти на странице . официальная документация. На данный момент единственное, что нам нужно знать, это то, что материализованное значение — это то, что мы получаем, когда запускаем поток. Поскольку нас пока интересовали только побочные эффекты, мы получили Unit
в качестве материализованного значения. Исключением была материализация раковины, результатом которой стало Future
. Это вернуло нам Future
, так как это значение может обозначать, когда поток, подключенный к приемнику, был завершен. До сих пор предыдущие примеры кода хорошо объясняли концепцию, но они также были скучными, потому что мы имели дело только с конечными потоками или с очень простыми бесконечными потоками. Чтобы сделать его более интересным, далее будет объяснен полный асинхронный и неограниченный поток.
Пример ClickStream
Например, нам нужен поток, который фиксирует события кликов. Чтобы усложнить задачу, предположим, что мы также хотим сгруппировать события кликов, которые происходят через короткое время друг за другом. Таким образом, мы могли легко обнаружить двойные, тройные или десятикратные клики. Кроме того, мы хотим отфильтровать все одиночные клики. Сделайте глубокий вдох и представьте, как бы вы решили эту проблему императивным способом. Бьюсь об заклад, никто не сможет реализовать решение, которое работает правильно с первой попытки. Реактивным способом решить эту проблему несложно. На самом деле решение настолько простое и понятное в реализации, что мы даже можем выразить его на диаграмме, непосредственно описывающей поведение кода:
Изображение взято из Введение в реактивное программирование, которое вы пропустили.
Серые прямоугольники — это функции, описывающие преобразование одного потока в другой. С функцией throttle
мы накапливаем клики в течение 250 миллисекунд, функции map
и filter
говорят сами за себя. Цветные сферы представляют события, а стрелки показывают, как они проходят через наши функции. Позже на этапах обработки мы получаем все меньше и меньше элементов, проходящих через наш поток, поскольку мы группируем их вместе и отфильтровываем. Код для этого изображения будет выглядеть примерно так:
val multiClickStream = clickStream
.throttle(250.millis)
.map(clickEvents => clickEvents.length)
.filter(numberOfClicks => numberOfClicks >= 2)
Всю логику можно представить всего в четырех строчках кода! В Scala мы могли бы написать это еще короче:
val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)
Определение clickStream
немного сложнее, но это только тот случай, потому что пример программы работает на JVM, где захват событий кликов невозможен. Еще одна сложность заключается в том, что Akka по умолчанию не предоставляет функцию throttle
. Вместо этого мы должны были написать это сами. Поскольку эта функция (как и в случае с функциями map
или filter
) может использоваться повторно в различных вариантах использования, я не считаю эти строки количеством строк, необходимых для реализации логики. Однако в императивных языках нормально, что логику нельзя так просто повторно использовать, и что различные логические шаги происходят в одном месте, а не применяются последовательно, что означает, что мы, вероятно, неправильно сформировали бы наш код с логикой регулирования. Полный пример кода доступен в виде gist и не будет обсуждаться здесь. дальше.
Пример SimpleWebServer
Вместо этого следует обсудить другой пример. Хотя поток кликов является хорошим примером, позволяющим Akka Streams обрабатывать пример реального мира, ему не хватает мощности для демонстрации параллельного выполнения в действии. Следующий пример должен представлять небольшой веб-сервер, который может обрабатывать несколько запросов параллельно. Веб-сервер должен иметь возможность принимать входящие соединения и получать от них последовательности байтов, которые представляют собой печатные символы ASCII. Эти последовательности байтов или строки должны быть разбиты на все символы новой строки на более мелкие части. После этого сервер должен ответить клиенту каждой из разделенных строк. В качестве альтернативы он мог бы сделать что-то еще со строками и выдать специальный токен ответа, но мы хотим, чтобы в этом примере все было просто, и поэтому не нужно вводить какие-либо причудливые функции. Помните, что сервер должен иметь возможность обрабатывать несколько запросов одновременно, что в основном означает, что ни один запрос не может блокировать любой другой запрос от дальнейшего выполнения. Решить все эти требования императивным способом может быть сложно — однако с Akka Streams нам не нужно больше нескольких строк, чтобы решить любое из них. Во-первых, давайте рассмотрим сам сервер:
а>
По сути, есть только три основных строительных блока. Первый должен принимать входящие соединения. Второй должен обрабатывать входящие запросы, а третий должен отправлять ответ. Реализация всех этих трех строительных блоков лишь немного сложнее, чем реализация потока кликов:
def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
import system.dispatcher
val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
Sink.foreach[Tcp.IncomingConnection] { conn =>
println(s"Incoming connection from: ${conn.remoteAddress}")
conn.handleWith(serverLogic)
}
val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
Tcp().bind(address, port)
val binding: Future[Tcp.ServerBinding] =
incomingCnnections.to(connectionHandler).run()
binding onComplete {
case Success(b) =>
println(s"Server started, listening on: ${b.localAddress}")
case Failure(e) =>
println(s"Server could not be bound to $address:$port: ${e.getMessage}")
}
}
Функция mkServer
принимает (помимо адреса и порта сервера) еще акторную систему и материализатор в качестве неявных параметров. Поток управления сервером представлен binding
, который берет источник входящих соединений и перенаправляет их в приемник входящих соединений. Внутри connectionHandler
, который является нашим приемником, мы обрабатываем каждое соединение потоком serverLogic
, который будет описан позже. binding
возвращает Future
, что завершается, когда сервер был запущен или запуск завершился неудачно, что может быть в том случае, когда порт уже занят другим процессом. Однако код не полностью отражает графику, поскольку мы не видим строительного блока, который обрабатывает ответы. Причина этого в том, что соединение уже само обеспечивает эту логику. Это двунаправленный поток, а не только однонаправленный, как потоки, которые мы видели в предыдущих примерах. Как и в случае с материализацией, такие сложные потоки здесь объясняться не будут. официальная документация содержит множество материалов. для покрытия более сложных графов потоков. Пока достаточно знать, что Tcp.IncomingConnection
представляет собой соединение, которое знает, как получать запросы и как отправлять ответы. Часть, которая все еще отсутствует, — это строительный блок serverLogic
. Это может выглядеть так:
Опять же, мы можем разделить логику на несколько простых строительных блоков, которые вместе образуют поток нашей программы. Сначала мы хотим разделить нашу последовательность байтов на строки, что мы должны делать всякий раз, когда находим символ новой строки. После этого байты каждой строки необходимо преобразовать в строку, потому что работа с необработанными байтами громоздка. В целом мы могли бы получить бинарный поток сложного протокола, что сделало бы работу с входящими необработанными данными чрезвычайно сложной. Когда у нас есть читаемая строка, мы можем создать ответ. Для простоты в нашем случае ответ может быть любым. В конце концов, нам нужно преобразовать наш ответ в последовательность байтов, которую можно отправить по сети. Код всей логики может выглядеть так:
val serverLogic: Flow[ByteString, ByteString, Unit] = {
val delimiter = Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true)
val receiver = Flow[ByteString].map { bytes =>
val message = bytes.utf8String
println(s"Server received: $message")
message
}
val responder = Flow[String].map { message =>
val answer = s"Server hereby responds to message: $message\n"
ByteString(answer)
}
Flow[ByteString]
.via(delimiter)
.via(receiver)
.via(responder)
}
Мы уже знаем, что serverLogic
— это поток, который принимает ByteString
и должен произвести ByteString
. С помощью delimiter
мы можем разделить ByteString
на более мелкие части — в нашем случае это должно происходить всякий раз, когда появляется символ новой строки. receiver
— это поток, который берет все разделенные последовательности байтов и преобразует их в строку. Это, конечно, опасное преобразование, так как только печатные символы ASCII должны быть преобразованы в строку, но для наших нужд этого достаточно. responder
— это последний компонент, отвечающий за создание ответа и обратное преобразование ответа в последовательность байтов. В отличие от графики мы не делили этот последний компонент на два, так как логика тривиальна. В конце соединяем все потоки через функцию via
. Здесь можно спросить, позаботились ли мы о многопользовательском свойстве, о котором упоминалось в начале. И действительно, мы это сделали, хотя это может быть неочевидно сразу. Глядя на этот рисунок, должно стать понятнее:
Компонент serverLogic
— это не что иное, как поток, содержащий более мелкие потоки. Этот компонент принимает входные данные, являющиеся запросом, и производит выходные данные, являющиеся ответом. Поскольку потоки могут создаваться несколько раз, и все они работают независимо друг от друга, мы достигаем за счет этого вложения нашего многопользовательского свойства. Каждый запрос обрабатывается в своем собственном запросе, поэтому короткий запрос может превысить ранее запущенный длительный запрос. Если вам интересно, определение serverLogic
, показанное ранее, конечно, можно написать намного короче, встроив большинство его внутренних определений:
val serverLogic = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(msg => s"Server hereby responds to message: $msg\n")
.map(ByteString(_))
Тест веб-сервера может выглядеть так:
$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?
Чтобы приведенный выше пример кода работал правильно, нам сначала нужно запустить сервер, который изображен скриптом startServer
:
$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?
Полный пример кода этого простого TCP-сервера можно найти здесь. Мы можем написать не только сервер с Akka Streams, но и клиент. Это может выглядеть так:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(println)
.map(_ ⇒ StdIn.readLine("> "))
.map(_+"\n")
.map(ByteString(_))
connection.join(flow).run()
Полный код клиента TCP можно найти здесь. Код выглядит очень похоже, но в отличие от сервера нам больше не нужно управлять входящими соединениями.
Сложные графики
В предыдущих разделах мы видели, как мы можем создавать простые программы из потоков. Однако в действительности зачастую недостаточно просто полагаться на уже встроенные функции для построения более сложных потоков. Если мы хотим иметь возможность использовать Akka Streams для произвольных программ, нам нужно знать, как создавать собственные настраиваемые структуры управления и комбинируемые потоки, которые позволяют нам справляться со сложностью наших приложений. Хорошей новостью является то, что Akka Streams была разработана с учетом потребностей пользователей, и чтобы дать вам краткое введение в более сложные части Akka Streams, мы добавили несколько дополнительных функций в наш пример клиент/сервер.
Одна вещь, которую мы пока не можем сделать, это закрыть соединение. На этом этапе все становится немного сложнее, потому что потоковый API, который мы видели до сих пор, не позволяет нам останавливать поток в произвольной точке. Однако существует абстракция GraphStage
, которую можно использовать для создания произвольных этапов обработки графа с любым количеством входных или выходных портов. Давайте сначала посмотрим на серверную часть, где мы представляем новый компонент, названный closeConnection
:
val closeConnection = new GraphStage[FlowShape[String, String]] {
val in = Inlet[String]("closeConnection.in")
val out = Outlet[String]("closeConnection.out")
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = grab(in) match {
case "q" ⇒
push(out, "BYE")
completeStage()
case msg ⇒
push(out, s"Server hereby responds to message: $msg\n")
}
})
setHandler(out, new OutHandler {
override def onPull() = pull(in)
})
}
}
Этот API выглядит гораздо более громоздким, чем потоковый API. Неудивительно, здесь мы должны сделать много императивных шагов. Взамен мы получаем больший контроль над поведением наших потоков. В приведенном выше примере мы указываем только один входной и один выходной порт и делаем их доступными для системы, переопределяя значение shape
. Кроме того, мы определили так называемые InHandler
и OutHandler
, которые в указанном порядке отвечают за прием и передачу элементов. Если вы внимательно изучили полный пример потока кликов, вы должны уже распознать эти компоненты. В InHandler
мы захватываем элемент, и если это строка с одним символом 'q'
, мы хотим закрыть поток. Чтобы дать клиенту возможность узнать, что поток скоро закроется, мы выводим строку "BYE"
, после чего сразу же закрываем этап. Компонент closeConnection
можно объединить с потоком с помощью метода via
, который был представлен в разделе о потоках.
Помимо возможности закрывать соединения, было бы неплохо, если бы мы могли показывать приветственное сообщение вновь созданному соединению. Чтобы сделать это, мы снова должны пойти немного дальше:
def serverLogic
(conn: Tcp.IncomingConnection)
(implicit system: ActorSystem)
: Flow[ByteString, ByteString, NotUsed]
= Flow.fromGraph(GraphDSL.create() { implicit b ⇒
import GraphDSL.Implicits._
val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
val logic = b.add(internalLogic)
val concat = b.add(Concat[ByteString]())
welcome ~> concat.in(0)
logic.outlet ~> concat.in(1)
FlowShape(logic.in, concat.out)
})
Теперь функция serverLogic
принимает входящее соединение в качестве параметра. Внутри его тела мы используем DSL, который позволяет нам описать сложное поведение потока. С помощью welcome
мы создаем поток, который может генерировать только один элемент — приветственное сообщение. logic
— это то, что было описано как serverLogic
в предыдущем разделе. Единственное заметное отличие состоит в том, что мы добавили к нему closeConnection
. Теперь начинается самое интересное, связанное с DSL. Функция GraphDSL.create
делает доступным конструктор b
, который используется для представления потока в виде графа. С помощью функции ~>
можно соединить входные и выходные порты друг с другом. Компонент Concat
, который используется в примере, может объединять элементы и здесь используется для добавления приветственного сообщения перед другими элементами, исходящими из internalLogic
. В последней строке мы делаем доступными только входной порт серверной логики и выходной порт объединенного потока, потому что все остальные порты должны оставаться деталями реализации компонента serverLogic
. Для более подробного ознакомления с DSL графа Akka Streams посетите соответствующий раздел в официальная документация. Полный пример кода сложного TCP-сервера и клиента, который может с ним взаимодействовать, можно найти здесь. Всякий раз, когда вы открываете новое соединение с клиента, вы должны увидеть приветственное сообщение, и, набрав "q"
на клиенте, вы должны увидеть сообщение о том, что соединение было отменено.
Есть еще некоторые темы, которые не были затронуты в этом ответе. В частности, материализация может напугать того или иного читателя, но я уверен, что с материалом, изложенным здесь, каждый сможет сделать следующие шаги самостоятельно. Как уже было сказано, официальная документация это хорошее место для продолжения изучения Akka Streams.
person
kiritsuku
schedule
31.01.2016