Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.kaspersky.his.loader.ipwhois
- import akka.actor.ActorSystem
- import akka.stream.scaladsl._
- import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
- import akka.stream.{FanOutShape2, _}
- object BidiTest extends App {
- implicit val system = ActorSystem("IPWhoisImporter")
- val decider: Supervision.Decider = {
- case t: Throwable =>
- system.log.error(s"Exception has been thrown: ${t.getClass.getName}: ${t.getMessage}")
- Supervision.Resume
- }
- implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
- implicit val ec = system.dispatcher
- case class Range(min: Int, max: Int)
- case class ClientShape(in1: Inlet[Range], next: Outlet[Range], out: Outlet[String]) extends FanOutShape2(FanOutShape.Ports(in1, next :: out :: Nil))
- val resultSink = Sink.foreach[String](println)
- val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit builder =>
- sink =>
- import GraphDSL.Implicits._
- val source = Source.single(Range(0, Int.MaxValue))
- val merge = builder.add(Merge[Range](2))
- val graphStage = new GraphStage[ClientShape] {
- val in: Inlet[Range] = Inlet("in")
- val next: Outlet[Range] = Outlet("next")
- val out: Outlet[String] = Outlet("out")
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
- setHandler(in, new InHandler {
- override def onPush(): Unit = {
- val range = grab(in)
- if (range.max < 100) {
- push(next, Range(range.min + 10, range.max + 10))
- }
- }
- })
- }
- override def shape: ClientShape = ClientShape(in, next, out)
- }
- val client: FanOutShape2[Range, Range, String] = builder.add(graphStage)
- source ~> merge ~> client.in
- merge <~ client.out0
- client.out1 ~> sink
- ClosedShape
- })
- for {
- _ <- g.run()
- _ = system.log.info("Job finished, shutting down...")
- _ <- system.terminate()
- } yield ()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement