Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def observeKey(key: String): Observable[ByteBuf] = {
- var running = true
- var index: Option[String] = None
- def longPoll(obs: Observer[ByteBuf]) = {
- if (running) {
- val current = index.map("?index=" + _).getOrElse("")
- HttpClient.get(URL(s"$agent/$key$current").get).foreach({response =>
- if (response.status == Ok) {
- index = Some(response.headers.get("X-Consul-Index").get)
- response.getContent.foreach(obs.onNext)
- } else {
- running = false
- obs.onCompleted()
- }
- })
- } else {
- obs.onCompleted()
- }
- }
- Observable.create({ obs =>
- longPoll(obs)
- Subscription({running = false})
- })
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement