package mix /* Scala 2.10.3 libraryDependencies ++= Seq( "com.netflix.rxjava" % "rxjava-scala" % "0.18.3", "com.h2database" % "h2" % "1.4.178", ) */ import java.sql.Connection import java.sql.DriverManager import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.duration.DurationInt import rx.lang.scala.Observable import rx.lang.scala.Subscription object Partial { def main(args: Array[String]) { Class.forName("org.h2.Driver") val conn = DriverManager.getConnection("jdbc:h2:mem:") conn.prepareStatement("create table persons (name varchar(100))").execute() List("horia", "raluca", "lara", "albert").foreach { p => conn.prepareStatement("insert into persons(name) values('" + p + "')").execute() } val found = searchPeople("select name from persons", conn) conn.close() println(found) } def searchPeople(query: String, conn: Connection): List[String] = { val five = Observable.interval(3000 millis).take(1) val jdbc = Observable[String](subscriber => { import scala.concurrent.ExecutionContext.Implicits.global subscriber.add(Subscription { Thread.currentThread().interrupt() }) Future { val rs = conn.prepareStatement(query).executeQuery() while (rs.next() && !subscriber.isUnsubscribed) { Thread.sleep(1000L) subscriber.onNext(rs.getString("name")) } if (!subscriber.isUnsubscribed) { subscriber.onCompleted() } } }) val max = jdbc.takeUntil(five) max.toBlockingObservable.toList } }