Advertisement
Guest User

Untitled

a guest
Dec 17th, 2017
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.04 KB | None | 0 0
  1. package sMusix.network
  2.  
  3. import akka.actor.{Actor, ActorSystem, Props}
  4. import akka.http.scaladsl.Http
  5. import akka.http.scaladsl.model.StatusCodes
  6. import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
  7. import akka.stream.ActorMaterializer
  8. import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
  9. import akka.{Done, NotUsed}
  10. import sMusix.{Evaluator, Log}
  11.  
  12. import scala.concurrent.Future
  13.  
  14. /**
  15.   * Transmitter provides connection with server
  16.   */
  17. object Transmitter {
  18.     implicit val system: ActorSystem = ActorSystem()
  19.     implicit val materializer: ActorMaterializer = ActorMaterializer()
  20.  
  21.     import system.dispatcher
  22.  
  23.     object Rec extends Actor {
  24.         override def receive: Receive = {
  25.             case TextMessage.Strict(msg) =>
  26.                 Log.info("Recevied signal " + msg)
  27.                 JsonHelper.parseJson(msg) match {
  28.                     case None => Log.warn("Could not parse JSON")
  29.                     case Some(j) => Evaluator.processOrder(j)
  30.                 }
  31.         }
  32.     }
  33.  
  34.     //  val host = "ws://echo.websocket.org"
  35.     val host = "ws://localhost:6666"
  36.  
  37.     val sink: Sink[Message, Future[Done]] = Sink.foreachParallel[Message](9)(a => {
  38.         system.actorOf(Props(Rec)) ! a
  39.     })
  40.  
  41.     val source: Source[Message, NotUsed] = Source(List("{\"command\":\"spammer\"}").map(TextMessage(_)))
  42.  
  43.     val flow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
  44.         Http().webSocketClientFlow(WebSocketRequest(host))
  45.  
  46.     val (upgradeResponse, closed) =
  47.         source
  48.         .viaMat(flow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
  49.         .toMat(sink)(Keep.both) // also keep the Future[Done]
  50.         .run()
  51.  
  52.     val connected: Future[Done.type] = upgradeResponse.flatMap { upgrade =>
  53.         if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
  54.             Future.successful(Done)
  55.         } else {
  56.             throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  57.         }
  58.     }
  59.  
  60.     def recv(): Unit = {
  61.         connected.onComplete(Log.info)
  62.     }
  63.  
  64.     var i = 0
  65.  
  66.     def spam(): Unit = {
  67.         //      Log.warn("reult: " + Await.result(send.offer(TextMessage("XD " + i)), 123 seconds))
  68.         i += 1
  69.     }
  70. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement