Advertisement
Guest User

Untitled

a guest
Nov 2nd, 2014
143
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 0.74 KB | None | 0 0
  1. def observeKey(key: String): Observable[ByteBuf] = {
  2.     var running = true
  3.     var index: Option[String] = None
  4.     def longPoll(obs: Observer[ByteBuf]) = {
  5.       if (running) {
  6.         val current = index.map("?index=" + _).getOrElse("")
  7.         HttpClient.get(URL(s"$agent/$key$current").get).foreach({response =>
  8.           if (response.status == Ok) {
  9.             index = Some(response.headers.get("X-Consul-Index").get)
  10.             response.getContent.foreach(obs.onNext)
  11.           } else {
  12.             running = false
  13.             obs.onCompleted()
  14.           }
  15.         })
  16.       } else {
  17.         obs.onCompleted()
  18.       }
  19.     }
  20.     Observable.create({ obs =>
  21.       longPoll(obs)
  22.       Subscription({running = false})
  23.     })
  24.   }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement