Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def onShutdown[T](t: => T, s: fs2.Stream[IO, T]) =
- fs2.Stream.eval {
- Deferred[IO, Unit] flatMap { shutdownInitiated =>
- Deferred[IO, Unit] map { streamFinished =>
- val sd = IO {
- sys.addShutdownHook {
- shutdownInitiated.complete().unsafeRunSync()
- streamFinished.get.unsafeRunSync()
- }
- }
- val poison = fs2.Stream.bracket(sd >> shutdownInitiated.get)(_ => IO.unit) >> fs2.Stream(Some(t), None)
- poison.merge(s.map(t => Some(t))).unNoneTerminate onFinalize {
- streamFinished.complete()
- }
- }
- }
- } flatten
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement