Advertisement
Guest User

Untitled

a guest
May 25th, 2016
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.04 KB | None | 0 0
  1. package com.kaspersky.his.loader.ipwhois
  2.  
  3. import akka.actor.ActorSystem
  4. import akka.stream.scaladsl._
  5. import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
  6. import akka.stream.{FanOutShape2, _}
  7.  
  8. object BidiTest extends App {
  9.   implicit val system = ActorSystem("IPWhoisImporter")
  10.   val decider: Supervision.Decider = {
  11.     case t: Throwable =>
  12.       system.log.error(s"Exception has been thrown: ${t.getClass.getName}: ${t.getMessage}")
  13.       Supervision.Resume
  14.   }
  15.   implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
  16.   implicit val ec = system.dispatcher
  17.  
  18.   case class Range(min: Int, max: Int)
  19.   case class ClientShape(in1: Inlet[Range], next: Outlet[Range], out: Outlet[String]) extends FanOutShape2(FanOutShape.Ports(in1, next :: out :: Nil))
  20.  
  21.   val resultSink = Sink.foreach[String](println)
  22.  
  23.   val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit builder =>
  24.     sink =>
  25.       import GraphDSL.Implicits._
  26.  
  27.       val source = Source.single(Range(0, Int.MaxValue))
  28.  
  29.       val merge = builder.add(Merge[Range](2))
  30.  
  31.       val graphStage = new GraphStage[ClientShape] {
  32.         val in: Inlet[Range] = Inlet("in")
  33.         val next: Outlet[Range] = Outlet("next")
  34.         val out: Outlet[String] = Outlet("out")
  35.  
  36.         override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
  37.           setHandler(in, new InHandler {
  38.             override def onPush(): Unit = {
  39.               val range = grab(in)
  40.               if (range.max < 100) {
  41.                 push(next, Range(range.min + 10, range.max + 10))
  42.               }
  43.             }
  44.           })
  45.         }
  46.         override def shape: ClientShape = ClientShape(in, next, out)
  47.       }
  48.  
  49.       val client: FanOutShape2[Range, Range, String] = builder.add(graphStage)
  50.  
  51.       source ~> merge ~> client.in
  52.       merge <~ client.out0
  53.       client.out1 ~> sink
  54.       ClosedShape
  55.   })
  56.  
  57.   for {
  58.     _ <- g.run()
  59.     _  = system.log.info("Job finished, shutting down...")
  60.     _ <- system.terminate()
  61.   } yield ()
  62. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement