Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def main(args: Array[String]): Unit = {
- implicit val system: ActorSystem = ActorSystem("scratch")
- implicit val materializer: ActorMaterializer = ActorMaterializer()
- implicit val executionContext: ExecutionContextExecutor = system.dispatcher
- val start = Instant.now()
- def elapsed = time.Duration.between(start, Instant.now()).toMillis
- val intSource = Source.queue[Int](2, OverflowStrategy.dropHead)
- val intSink = Sink foreach { ii: Int =>
- Thread.sleep(1000)
- println(s"processing $ii at $elapsed")
- }
- val intChannel = intSource.to(intSink).run()
- (1 to 4) map { ii =>
- println(s"offer invocation for $ii at $elapsed")
- (ii, intChannel.offer(ii))
- } foreach { intFutureOfferResultPair =>
- val (ii, futureOfferResult) = intFutureOfferResultPair
- futureOfferResult onComplete { offerResult =>
- println(s"offer result for $ii: $offerResult at $elapsed")
- }
- }
- intChannel.complete()
- intChannel.watchCompletion.onComplete { _ => system.terminate() }
- }
- offer invocation for 1 at 72
- offer invocation for 2 at 77
- offer invocation for 3 at 77
- offer invocation for 4 at 77
- offer result for 1: Success(Enqueued) at 90
- processing 1 at 1084
- offer result for 2: Success(Enqueued) at 1084
- processing 2 at 2084
- offer result for 3: Success(Enqueued) at 2084
- processing 3 at 3084
- offer result for 4: Success(Enqueued) at 3084
- processing 4 at 4084
Add Comment
Please, Sign In to add comment