Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package mix
- /*
- Scala 2.10.3
- libraryDependencies ++= Seq(
- "com.netflix.rxjava" % "rxjava-scala" % "0.18.3",
- "com.h2database" % "h2" % "1.4.178",
- )
- */
- import java.sql.Connection
- import java.sql.DriverManager
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.concurrent.Future
- import scala.concurrent.duration.DurationInt
- import rx.lang.scala.Observable
- import rx.lang.scala.Subscription
- object Partial {
- def main(args: Array[String]) {
- Class.forName("org.h2.Driver")
- val conn = DriverManager.getConnection("jdbc:h2:mem:")
- conn.prepareStatement("create table persons (name varchar(100))").execute()
- List("horia", "raluca", "lara", "albert").foreach { p =>
- conn.prepareStatement("insert into persons(name) values('" + p + "')").execute()
- }
- val found = searchPeople("select name from persons", conn)
- conn.close()
- println(found)
- }
- def searchPeople(query: String, conn: Connection): List[String] = {
- val five = Observable.interval(3000 millis).take(1)
- val jdbc = Observable[String](subscriber => {
- import scala.concurrent.ExecutionContext.Implicits.global
- subscriber.add(Subscription {
- Thread.currentThread().interrupt()
- })
- Future {
- val rs = conn.prepareStatement(query).executeQuery()
- while (rs.next() && !subscriber.isUnsubscribed) {
- Thread.sleep(1000L)
- subscriber.onNext(rs.getString("name"))
- }
- if (!subscriber.isUnsubscribed) {
- subscriber.onCompleted()
- }
- }
- })
- val max = jdbc.takeUntil(five)
- max.toBlockingObservable.toList
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement