Advertisement
Guest User

Untitled

a guest
Jun 27th, 2019
100
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.24 KB | None | 0 0
  1. import akka.actor.ActorSystem
  2. import akka.Done
  3. import akka.http.scaladsl.Http
  4. import akka.stream.ActorMaterializer
  5. import akka.stream.scaladsl._
  6. import akka.http.scaladsl.model._
  7. import akka.http.scaladsl.model.ws._
  8.  
  9. import scala.concurrent._
  10. import scala.concurrent.duration._
  11.  
  12.  
  13. object Main extends App {
  14. implicit val system = ActorSystem()
  15. implicit val materializer = ActorMaterializer()
  16.  
  17. import system.dispatcher
  18.  
  19. // Future[Done] is the materialized value of Sink.foreach,
  20. // emitted when the stream completes
  21. val incoming: Sink[Message, Future[Done]] =
  22. Sink.foreach[Message] {
  23. case message: TextMessage.Strict =>
  24. println(message.text)
  25. }
  26.  
  27. // send this as a message over the WebSocket
  28. val outgoing = Source.single(TextMessage("hello world!"))
  29.  
  30. // flow to use (note: not re-usable!)
  31. val webSocketFlow = RestartFlow.withBackoff(
  32. minBackoff = 3.seconds,
  33. maxBackoff = 30.seconds,
  34. randomFactor = 0.2
  35. ) { () =>
  36. Http().webSocketClientFlow(WebSocketRequest("ws://127.0.0.1:8080/"))
  37. }
  38. // the materialized value is a tuple with
  39. // upgradeResponse is a Future[WebSocketUpgradeResponse] that
  40. // completes or fails when the connection succeeds or fails
  41. // and closed is a Future[Done] with the stream completion from the incoming sink
  42. val (upgradeResponse, closed) =
  43. outgoing
  44. .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
  45. .toMat(incoming)(Keep.both) // also keep the Future[Done]
  46. .run()
  47.  
  48. // just like a regular http request we can access response status which is available via upgrade.response.status
  49. // status code 101 (Switching Protocols) indicates that server support WebSockets
  50. val connected = upgradeResponse.flatMap { upgrade =>
  51. if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
  52. Future.successful(Done)
  53. } else {
  54. throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  55. }
  56. }
  57.  
  58. connected.onComplete(println)
  59. closed.foreach(_ => println("closed"))
  60.  
  61. }
  62.  
  63. [error] /home/developer/scala/wsrestart/src/main/scala/com/sweetsoft/Main.scala:52:35: value flatMap is not a member of akka.NotUsed
  64. [error] val connected = upgradeResponse.flatMap { upgrade =>
  65. [error] ^
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement