Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import rx.lang.scala.{Subject, Observable}
- import slick.backend.DatabasePublisher
- import scala.concurrent.ExecutionContext
- import scala.util.{Failure, Success}
- trait StreamExtensions {
- implicit def extendDatabasePublisher[T](publisher: DatabasePublisher[T]): StreamExtensions.DatabasePublisherExtensions[T] =
- StreamExtensions.DatabasePublisherExtensions[T](publisher)
- }
- object StreamExtensions {
- implicit class DatabasePublisherExtensions[T](val publisher: DatabasePublisher[T]) extends AnyVal {
- def toObservable(implicit ec: ExecutionContext): Observable[T] = {
- val subject = Subject[T]()
- val future = publisher foreach subject.onNext
- future onComplete {
- case Success(_) => subject.onCompleted()
- case Failure(error) => subject.onError(error)
- }
- subject
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement