Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cats.implicits._
- import cats.effect.IO
- import doobie._
- import doobie.implicits._
- import fs2.Stream
- object Main extends App {
- case class RedisClient()
- val xa = Transactor.fromDriverManager[IO](
- "org.postgresql.Driver", "jdbc:postgresql:world", "postgres", ""
- )
- type Query[A] = (Int, Int) => ConnectionIO[A]
- type Consume[A] = (RedisClient, A) => Unit
- def offsets(lim: Int): Stream[IO, Int] =
- Stream.iterate(0)(_ + lim)
- def withClient[A](f: RedisClient => A): A = ???
- def find[A](limit: Int, offset: Int): ConnectionIO[List[Int]] = ???
- def store(rc: RedisClient, rows: List[Int]): Unit = ???
- def queryWithLimit[A](q: Query[A], limit: Int, sink: Consume[A]): IO[Unit] = {
- val s = for {
- off <- offsets(limit)
- rows <- Stream.eval(find(off, limit).transact(xa))
- _ <- Stream.eval(IO(sink()))
- } yield rows.size
- s.takeThrough(_ == limit).compile.drain
- }
- def run(rc: RedisClient, limit: Int): IO[Unit] = {
- val s = for {
- off <- offsets(limit)
- rows <- Stream.eval(find(off, limit).transact(xa))
- _ <- Stream.eval(IO(store(rc, rows)))
- } yield rows.size
- s.takeThrough(_ == limit).compile.drain
- }
- val main: IO[Unit] = withClient { rc => run(rc, 100) }
- }
Add Comment
Please, Sign In to add comment