Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.actor.ActorSystem
- import akka.Done
- import akka.http.scaladsl.Http
- import akka.stream.ActorMaterializer
- import akka.stream.scaladsl._
- import akka.http.scaladsl.model._
- import akka.http.scaladsl.model.ws._
- import scala.concurrent._
- import scala.concurrent.duration._
- object Main extends App {
- implicit val system = ActorSystem()
- implicit val materializer = ActorMaterializer()
- import system.dispatcher
- // Future[Done] is the materialized value of Sink.foreach,
- // emitted when the stream completes
- val incoming: Sink[Message, Future[Done]] =
- Sink.foreach[Message] {
- case message: TextMessage.Strict =>
- println(message.text)
- }
- // send this as a message over the WebSocket
- val outgoing = Source.single(TextMessage("hello world!"))
- // flow to use (note: not re-usable!)
- val webSocketFlow = RestartFlow.withBackoff(
- minBackoff = 3.seconds,
- maxBackoff = 30.seconds,
- randomFactor = 0.2
- ) { () =>
- Http().webSocketClientFlow(WebSocketRequest("ws://127.0.0.1:8080/"))
- }
- // the materialized value is a tuple with
- // upgradeResponse is a Future[WebSocketUpgradeResponse] that
- // completes or fails when the connection succeeds or fails
- // and closed is a Future[Done] with the stream completion from the incoming sink
- val (upgradeResponse, closed) =
- outgoing
- .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
- .toMat(incoming)(Keep.both) // also keep the Future[Done]
- .run()
- // just like a regular http request we can access response status which is available via upgrade.response.status
- // status code 101 (Switching Protocols) indicates that server support WebSockets
- val connected = upgradeResponse.flatMap { upgrade =>
- if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
- Future.successful(Done)
- } else {
- throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
- }
- }
- connected.onComplete(println)
- closed.foreach(_ => println("closed"))
- }
- [error] /home/developer/scala/wsrestart/src/main/scala/com/sweetsoft/Main.scala:52:35: value flatMap is not a member of akka.NotUsed
- [error] val connected = upgradeResponse.flatMap { upgrade =>
- [error] ^
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement