Advertisement
Guest User

Untitled

a guest
Aug 28th, 2015
115
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.81 KB | None | 0 0
  1. import rx.lang.scala.{Subject, Observable}
  2. import slick.backend.DatabasePublisher
  3.  
  4. import scala.concurrent.ExecutionContext
  5. import scala.util.{Failure, Success}
  6.  
  7. trait StreamExtensions {
  8. implicit def extendDatabasePublisher[T](publisher: DatabasePublisher[T]): StreamExtensions.DatabasePublisherExtensions[T] =
  9. StreamExtensions.DatabasePublisherExtensions[T](publisher)
  10. }
  11.  
  12. object StreamExtensions {
  13. implicit class DatabasePublisherExtensions[T](val publisher: DatabasePublisher[T]) extends AnyVal {
  14. def toObservable(implicit ec: ExecutionContext): Observable[T] = {
  15. val subject = Subject[T]()
  16. val future = publisher foreach subject.onNext
  17. future onComplete {
  18. case Success(_) => subject.onCompleted()
  19. case Failure(error) => subject.onError(error)
  20. }
  21. subject
  22. }
  23. }
  24. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement