SHARE
TWEET

Untitled

a guest Jun 27th, 2019 64 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import akka.actor.ActorSystem
  2. import akka.Done
  3. import akka.http.scaladsl.Http
  4. import akka.stream.ActorMaterializer
  5. import akka.stream.scaladsl._
  6. import akka.http.scaladsl.model._
  7. import akka.http.scaladsl.model.ws._
  8.  
  9. import scala.concurrent._
  10. import scala.concurrent.duration._
  11.  
  12.  
  13. object Main extends App {
  14.   implicit val system = ActorSystem()
  15.   implicit val materializer = ActorMaterializer()
  16.  
  17.   import system.dispatcher
  18.  
  19.   // Future[Done] is the materialized value of Sink.foreach,
  20.   // emitted when the stream completes
  21.   val incoming: Sink[Message, Future[Done]] =
  22.   Sink.foreach[Message] {
  23.     case message: TextMessage.Strict =>
  24.       println(message.text)
  25.   }
  26.  
  27.   // send this as a message over the WebSocket
  28.   val outgoing = Source.single(TextMessage("hello world!"))
  29.  
  30.   // flow to use (note: not re-usable!)
  31.   val webSocketFlow = RestartFlow.withBackoff(
  32.     minBackoff = 3.seconds,
  33.     maxBackoff = 30.seconds,
  34.     randomFactor = 0.2
  35.   ) { () =>
  36.     Http().webSocketClientFlow(WebSocketRequest("ws://127.0.0.1:8080/"))
  37.   }
  38.   // the materialized value is a tuple with
  39.   // upgradeResponse is a Future[WebSocketUpgradeResponse] that
  40.   // completes or fails when the connection succeeds or fails
  41.   // and closed is a Future[Done] with the stream completion from the incoming sink
  42.   val (upgradeResponse, closed) =
  43.   outgoing
  44.     .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
  45.     .toMat(incoming)(Keep.both) // also keep the Future[Done]
  46.     .run()
  47.  
  48.   // just like a regular http request we can access response status which is available via upgrade.response.status
  49.   // status code 101 (Switching Protocols) indicates that server support WebSockets
  50.   val connected = upgradeResponse.flatMap { upgrade =>
  51.     if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
  52.       Future.successful(Done)
  53.     } else {
  54.       throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  55.     }
  56.   }
  57.  
  58.   connected.onComplete(println)
  59.   closed.foreach(_ => println("closed"))
  60.  
  61. }
  62.      
  63. [error] /home/developer/scala/wsrestart/src/main/scala/com/sweetsoft/Main.scala:52:35: value flatMap is not a member of akka.NotUsed
  64. [error]   val connected = upgradeResponse.flatMap { upgrade =>
  65. [error]                                   ^
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top