Guest User

Untitled

a guest
Nov 7th, 2017
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.57 KB | None | 0 0
  1. import java.sql.Timestamp
  2. import java.time.{Instant, ZonedDateTime}
  3.  
  4. import cats.effect.IO
  5. import cats.implicits._
  6. import doobie._
  7. import doobie.implicits._
  8. import fs2.Stream
  9. import scodec.bits.ByteVector
  10.  
  11. trait CSVSerializer[A] {
  12. def toCSV(createdAt: Timestamp)(a: A): String
  13. }
  14.  
  15. case class DataModel(id: Int, name: String)
  16.  
  17. object Main extends App {
  18.  
  19. implicit object DataModelCSVSerializer extends CSVSerializer[DataModel] {
  20. override def toCSV(createdAt: Timestamp)(a: DataModel): String =
  21. List(
  22. a.id.show,
  23. a.name.show,
  24. createdAt.toString
  25. ).intercalate(",")
  26. }
  27.  
  28. val xa = Transactor.fromDriverManager[IO](
  29. "org.postgresql.Driver",
  30. "jdbc:postgresql://localhost:5432/testdb",
  31. "postgres",
  32. ""
  33. )
  34.  
  35. val ts = new Timestamp(Instant.from(ZonedDateTime.now).toEpochMilli)
  36.  
  37. val byteStream: Stream[IO, ByteVector] =
  38. sql"SELECT id, name FROM table_name"
  39. .query[DataModel]
  40. .streamWithChunkSize(5000)
  41. .transact(xa)
  42. .map((sc: DataModel) => DataModelCSVSerializer.toCSV(ts)(sc))
  43. .map((str: String) => ByteVector.view(str.getBytes("UTF-8")))
  44.  
  45. println(
  46. "The no. of records imported is " + HiveImporter
  47. .writeToHive[IO](byteStream)
  48. .runLog
  49. .unsafeRunSync)
  50. }
  51.  
  52. object HiveImporter {
  53.  
  54. def writeToHive[F[_]](s: Stream[F, ByteVector])(implicit F: Effect[F]): Stream[F, Long] = {
  55. val thriftUrl = "thrift://localhost:9083"
  56. for {
  57. msc <- HiveMetastore.client[F](thriftUrl)
  58. table <- msc.getTable("database", "table_name")
  59. basePath <- msc.getWritePath(table, None)
  60. writer <- OrcWriter.writer[F](thriftUrl, createSerde(thriftUrl, table), getFilePath(basePath))
  61. _ <- s.to(writer.writeRows)
  62. rowCount <- writer.getRowCount
  63. } yield rowCount
  64. }
  65. }
  66.  
  67. trait HiveMetastore[F[_]] {
  68. def getTable(database: String, table: String): Stream[F, Table]
  69. def getWritePath(table: Table, partitionVal: Option[String]): Stream[F, Path]
  70. }
  71.  
  72. object HiveMetastore {
  73.  
  74. def client[F[_]](thriftUrl: String)(implicit F: Effect[F]): Stream[F, HiveMetastore[F]] = {
  75.  
  76. def setup(thriftUrl: String): F[IMetaStoreClient] =
  77. F.delay {
  78. new HiveMetaStoreClient(Utils.getHiveConf(thriftUrl))
  79. }
  80.  
  81. def cleanup(msc: IMetaStoreClient): F[Unit] =
  82. F.delay {
  83. msc.close()
  84. }
  85.  
  86. Stream.bracket(setup(thriftUrl))({ msc =>
  87. Stream.eval(mkMetastore[F](msc))
  88. }, cleanup)
  89. }
  90.  
  91. def mkMetastore[F[_]](msc: IMetaStoreClient)(implicit F: Effect[F]): F[HiveMetastore[F]] =
  92. F.pure {
  93. new HiveMetastore[F] {
  94. def getTable(database: String, table: String): Stream[F, Table] =
  95. Stream(msc.getTable(database, table)).covary[F]
  96.  
  97. def getWritePath(table: Table, partitionVal: Option[String]): Stream[F, Path] = {
  98. val location = partitionVal
  99. .map(
  100. value =>
  101. msc
  102. .getPartition(table.getDbName, table.getTableName, value)
  103. .getSd
  104. .getLocation)
  105. .getOrElse(table.getSd.getLocation)
  106. Stream(new Path(location)).covary[F]
  107. }
  108. }
  109. }
  110. }
  111.  
  112. trait OrcWriter[F[_]] {
  113. def writeBytes(byteVector: ByteVector): F[Unit]
  114. def writeRows: Sink[F, ByteVector]
  115. def getRowCount: Stream[F, Long]
  116. }
  117.  
  118. object OrcWriter {
  119.  
  120. def writer[F[_]](thriftUrl: String, serde: AbstractSerDe, filePath: Path)(
  121. implicit F: Effect[F]): Stream[F, OrcWriter[F]] = {
  122.  
  123. def setup(thriftUrl: String, serde: AbstractSerDe, filePath: Path): F[Writer] =
  124. F.delay {
  125. val options = OrcFile.writerOptions(Utils.getHiveConf(thriftUrl))
  126. options.inspector(serde.getObjectInspector)
  127. OrcFile.createWriter(filePath, options)
  128. }
  129.  
  130. def cleanup(w: Writer): F[Unit] =
  131. F.delay {
  132. w.close()
  133. }
  134.  
  135. Stream.bracket(setup(thriftUrl, serde, filePath))({ w =>
  136. Stream.eval(mkOrcWriter[F](w, serde, filePath))
  137. }, cleanup)
  138. }
  139.  
  140. def mkOrcWriter[F[_]](writer: Writer, serde: AbstractSerDe, filePath: Path)(
  141. implicit F: Effect[F]): F[OrcWriter[F]] = F.pure(
  142. new OrcWriter[F] {
  143.  
  144. override def writeBytes(byteVector: ByteVector): F[Unit] = {
  145. F.onError(F.catchNonFatal(
  146. writer.addRow(serde.deserialize(new BytesWritable(byteVector.toArray))))) {
  147. case _: SerDeException | _: IOException =>
  148. F.catchNonFatal(FilePathUtils.deleteFilePathIfExists(filePath))
  149. }
  150. }
  151.  
  152. override def writeRows: Sink[F, ByteVector] =
  153. _.flatMap(bs => Stream.eval(writeBytes(bs)))
  154.  
  155. override def getRowCount: Stream[F, Long] = Stream(writer.getNumberOfRows).covary[F]
  156. }
  157. )
  158.  
  159. }
Add Comment
Please, Sign In to add comment