Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.sql.Timestamp
- import java.time.{Instant, ZonedDateTime}
- import cats.effect.IO
- import cats.implicits._
- import doobie._
- import doobie.implicits._
- import fs2.Stream
- import scodec.bits.ByteVector
- trait CSVSerializer[A] {
- def toCSV(createdAt: Timestamp)(a: A): String
- }
- case class DataModel(id: Int, name: String)
- object Main extends App {
- implicit object DataModelCSVSerializer extends CSVSerializer[DataModel] {
- override def toCSV(createdAt: Timestamp)(a: DataModel): String =
- List(
- a.id.show,
- a.name.show,
- createdAt.toString
- ).intercalate(",")
- }
- val xa = Transactor.fromDriverManager[IO](
- "org.postgresql.Driver",
- "jdbc:postgresql://localhost:5432/testdb",
- "postgres",
- ""
- )
- val ts = new Timestamp(Instant.from(ZonedDateTime.now).toEpochMilli)
- val byteStream: Stream[IO, ByteVector] =
- sql"SELECT id, name FROM table_name"
- .query[DataModel]
- .streamWithChunkSize(5000)
- .transact(xa)
- .map((sc: DataModel) => DataModelCSVSerializer.toCSV(ts)(sc))
- .map((str: String) => ByteVector.view(str.getBytes("UTF-8")))
- println(
- "The no. of records imported is " + HiveImporter
- .writeToHive[IO](byteStream)
- .runLog
- .unsafeRunSync)
- }
- object HiveImporter {
- def writeToHive[F[_]](s: Stream[F, ByteVector])(implicit F: Effect[F]): Stream[F, Long] = {
- val thriftUrl = "thrift://localhost:9083"
- for {
- msc <- HiveMetastore.client[F](thriftUrl)
- table <- msc.getTable("database", "table_name")
- basePath <- msc.getWritePath(table, None)
- writer <- OrcWriter.writer[F](thriftUrl, createSerde(thriftUrl, table), getFilePath(basePath))
- _ <- s.to(writer.writeRows)
- rowCount <- writer.getRowCount
- } yield rowCount
- }
- }
- trait HiveMetastore[F[_]] {
- def getTable(database: String, table: String): Stream[F, Table]
- def getWritePath(table: Table, partitionVal: Option[String]): Stream[F, Path]
- }
- object HiveMetastore {
- def client[F[_]](thriftUrl: String)(implicit F: Effect[F]): Stream[F, HiveMetastore[F]] = {
- def setup(thriftUrl: String): F[IMetaStoreClient] =
- F.delay {
- new HiveMetaStoreClient(Utils.getHiveConf(thriftUrl))
- }
- def cleanup(msc: IMetaStoreClient): F[Unit] =
- F.delay {
- msc.close()
- }
- Stream.bracket(setup(thriftUrl))({ msc =>
- Stream.eval(mkMetastore[F](msc))
- }, cleanup)
- }
- def mkMetastore[F[_]](msc: IMetaStoreClient)(implicit F: Effect[F]): F[HiveMetastore[F]] =
- F.pure {
- new HiveMetastore[F] {
- def getTable(database: String, table: String): Stream[F, Table] =
- Stream(msc.getTable(database, table)).covary[F]
- def getWritePath(table: Table, partitionVal: Option[String]): Stream[F, Path] = {
- val location = partitionVal
- .map(
- value =>
- msc
- .getPartition(table.getDbName, table.getTableName, value)
- .getSd
- .getLocation)
- .getOrElse(table.getSd.getLocation)
- Stream(new Path(location)).covary[F]
- }
- }
- }
- }
- trait OrcWriter[F[_]] {
- def writeBytes(byteVector: ByteVector): F[Unit]
- def writeRows: Sink[F, ByteVector]
- def getRowCount: Stream[F, Long]
- }
- object OrcWriter {
- def writer[F[_]](thriftUrl: String, serde: AbstractSerDe, filePath: Path)(
- implicit F: Effect[F]): Stream[F, OrcWriter[F]] = {
- def setup(thriftUrl: String, serde: AbstractSerDe, filePath: Path): F[Writer] =
- F.delay {
- val options = OrcFile.writerOptions(Utils.getHiveConf(thriftUrl))
- options.inspector(serde.getObjectInspector)
- OrcFile.createWriter(filePath, options)
- }
- def cleanup(w: Writer): F[Unit] =
- F.delay {
- w.close()
- }
- Stream.bracket(setup(thriftUrl, serde, filePath))({ w =>
- Stream.eval(mkOrcWriter[F](w, serde, filePath))
- }, cleanup)
- }
- def mkOrcWriter[F[_]](writer: Writer, serde: AbstractSerDe, filePath: Path)(
- implicit F: Effect[F]): F[OrcWriter[F]] = F.pure(
- new OrcWriter[F] {
- override def writeBytes(byteVector: ByteVector): F[Unit] = {
- F.onError(F.catchNonFatal(
- writer.addRow(serde.deserialize(new BytesWritable(byteVector.toArray))))) {
- case _: SerDeException | _: IOException =>
- F.catchNonFatal(FilePathUtils.deleteFilePathIfExists(filePath))
- }
- }
- override def writeRows: Sink[F, ByteVector] =
- _.flatMap(bs => Stream.eval(writeBytes(bs)))
- override def getRowCount: Stream[F, Long] = Stream(writer.getNumberOfRows).covary[F]
- }
- )
- }
Add Comment
Please, Sign In to add comment