Guest User

Untitled

a guest
Apr 24th, 2018
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.22 KB | None | 0 0
  1. import cats.implicits._
  2. import cats.effect.IO
  3. import doobie._
  4. import doobie.implicits._
  5. import fs2.Stream
  6.  
  7. object Main extends App {
  8. case class RedisClient()
  9.  
  10. val xa = Transactor.fromDriverManager[IO](
  11. "org.postgresql.Driver", "jdbc:postgresql:world", "postgres", ""
  12. )
  13.  
  14. type Query[A] = (Int, Int) => ConnectionIO[A]
  15. type Consume[A] = (RedisClient, A) => Unit
  16.  
  17. def offsets(lim: Int): Stream[IO, Int] =
  18. Stream.iterate(0)(_ + lim)
  19.  
  20. def withClient[A](f: RedisClient => A): A = ???
  21.  
  22. def find[A](limit: Int, offset: Int): ConnectionIO[List[Int]] = ???
  23.  
  24. def store(rc: RedisClient, rows: List[Int]): Unit = ???
  25.  
  26. def queryWithLimit[A](q: Query[A], limit: Int, sink: Consume[A]): IO[Unit] = {
  27. val s = for {
  28. off <- offsets(limit)
  29. rows <- Stream.eval(find(off, limit).transact(xa))
  30. _ <- Stream.eval(IO(sink()))
  31. } yield rows.size
  32.  
  33. s.takeThrough(_ == limit).compile.drain
  34. }
  35.  
  36. def run(rc: RedisClient, limit: Int): IO[Unit] = {
  37. val s = for {
  38. off <- offsets(limit)
  39. rows <- Stream.eval(find(off, limit).transact(xa))
  40. _ <- Stream.eval(IO(store(rc, rows)))
  41. } yield rows.size
  42.  
  43. s.takeThrough(_ == limit).compile.drain
  44. }
  45.  
  46. val main: IO[Unit] = withClient { rc => run(rc, 100) }
  47. }
Add Comment
Please, Sign In to add comment