Остановить fs2-поток по истечении таймаута

Я хочу использовать функцию, аналогичную take(n: Int), но во временном измерении: consume(period: Duration. Поэтому я хочу, чтобы поток прервался, если произойдет тайм-аут. Я знаю, что могу скомпилировать поток во что-то вроде IO[List[T]] и отменить его, но тогда я потеряю результат. На самом деле я хочу преобразовать бесконечный поток в ограниченный и сохранить результаты.

Подробнее о более широком масштабе проблемы. У меня есть бесконечный поток событий от брокера обмена сообщениями, но у меня также есть ротация учетных данных для подключения к брокеру. Итак, я хочу использовать поток событий в течение некоторого времени, затем остановиться, получить новые учетные данные, снова подключиться к брокеру, создающему новый поток, и объединить два потока в один.


person Mikhail Golubtsov    schedule 16.05.2019    source источник


Ответы (2)


Есть метод, который делает именно это:

/**
    * Interrupts this stream after the specified duration has passed.
    */
  def interruptAfter[F2[x] >: F[x]: Concurrent: Timer](duration: FiniteDuration): Stream[F2, O]
person Mikhail Golubtsov    schedule 16.05.2019

Вам нужно что-то подобное

import scala.util.Random
import scala.concurrent.ExecutionContext
import fs2._
import fs2.concurrent.SignallingRef
implicit val ex = ExecutionContext.global
implicit val t: Timer[IO] = IO.timer(ex)
implicit val cs: ContextShift[IO] = IO.contextShift(ex)

val effect: IO[Long] = IO.sleep(1.second).flatMap(_ => IO{
  val next = Random.nextLong()
  println("NEXT: " + next)
  next
})
val signal = SignallingRef[IO, Boolean](false).unsafeRunSync()
val timer = Stream.sleep(10.seconds).flatMap(_ => 
  Stream.eval(signal.set(true)).flatMap(_ => 
    Stream.emit(println("Finish")).covary[IO]))

val stream = timer concurrently 
Stream.repeatEval(effect).interruptWhen(signal)

stream.compile.drain.unsafeRunSync()

Также, если вы хотите сохранить результат публикации данных, вам потребуется некоторая неограниченная очередь из fs2 для преобразования опубликованных данных в ваш результат через queue.stream

person Mikhail Nemenko    schedule 16.05.2019