Как программно завершить работу fs2.StreamApp?

Расширение StreamApp просит вас предоставить stream def. Имеет параметр requestShutdown.

def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode]

Я предоставляю для этого реализацию и понимаю, что args передается в качестве аргументов командной строки. Однако я не уверен, что предоставляет параметр requestShutdown и что я могу с ним сделать.

В частности, я хотел бы вызвать плавное завершение работы Stream[IO, ExitCode], который запускает сервер Http4s (который блокируется навсегда).

Похоже, нужен Signal и его нужно установить? Базовый поток, к которому я пытаюсь «добраться», выглядит так:

for {
   scheduler <- Scheduler[IO](corePoolSize = 1)
   exitCode  <- BlazeBuilder[IO]
                    .bindHttp(port, "0.0.0.0")
                    .mountService(services(scheduler), "/")
                    .serve
    } yield exitCode

Мой stream def: здесь и StreamAppSpec из проекта fs2 есть что-то в _ 12_, но я не могу понять, как бы это приспособить.


person Toby    schedule 06.03.2018    source источник


Ответы (1)


Вы можете думать о параметре requestShutdown, который передается в функцию потока, как о значении действия, при выполнении которого будет запрашиваться завершение программы.

Его выполнение, следовательно, приведет к завершению программы.

Вот пример использования:

  override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
    for {
      scheduler <- Scheduler[IO](corePoolSize = 1)
      exitStream = scheduler.sleep[IO](10 seconds)
       .evalMap(_ => requestShutdown)
       .map(_ => ExitCode.Success)
      serverStream = BlazeBuilder[IO]
        .bindHttp(port, "0.0.0.0")
        .mountService(services(scheduler), "/")
        .serve
      result <- Stream.emits(List(exitStream, serverStream)).joinUnbounded
    } yield result

В этом сценарии мы создаем два потока:

  • Первый будет ждать 10 секунд, прежде чем вызвать эффект закрытия приложения.

  • Второй будет запускать сервер http4s.

Затем мы объединяем эти два потока, чтобы они работали одновременно, что означает, что веб-сервер будет работать в течение 10 секунд, прежде чем другой поток сигнализирует о том, что программа должна завершиться.

person TheInnerLight    schedule 07.03.2018
comment
Спасибо. Любые предложения по подключению чего-либо, например, тестов, которые могут выключить сервер? До StreamApp я делал это вручную с защелкой обратного отсчета, поэтому я не чувствую, что перехожу к _2 _... - person Toby; 16.03.2018
comment
@Toby Я имею в виду, что самый простой способ заставить его - просто вызвать requestShutdown.unsafeRunSync(), чтобы вызвать побочный эффект в подходящий момент. Это не то, что я предлагаю в производственном коде, но это простой способ его протестировать и посмотреть, как он работает. - person TheInnerLight; 16.03.2018
comment
@Toby При более сложном подходе вы можете легко переместить exitStream (или что-то подобное) за пределы StreamApp, а затем присоединиться к этому потоку с stream из StreamApp, чтобы вы не загрязняли код своего реального сервера логикой тестового выключения. - person TheInnerLight; 16.03.2018
comment
Я пойду и доложу, если что-нибудь интересное придумаю - person Toby; 16.03.2018
comment
Я кое-что сделал. Это не очень интересно! ru / tobyweston / temperature-machine / blob / - person Toby; 28.03.2018