mitrakov

Using wsClient + GraphDSL for streaming CSV

Feb 23rd, 2019
317
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.54 KB | None | 0 0
  1. import javax.inject._
  2.  
  3. import akka.NotUsed
  4. import akka.stream.alpakka.csv.scaladsl.CsvParsing
  5. import akka.stream.alpakka.csv.scaladsl.CsvParsing._
  6. import akka.stream.scaladsl.{Flow, GraphDSL, Sink}
  7. import akka.stream.{ClosedShape, Graph}
  8. import akka.util.ByteString
  9. import play.api.libs.ws.{WSClient, WSResponse}
  10.  
  11. import scala.concurrent.{ExecutionContext, Future}
  12. import scala.util.Try
  13.  
  14. @Singleton
  15. class A @Inject()(ws: WSClient)(implicit ec: ExecutionContext) {
  16.   def insertBatch(s: Seq[String]): Future[Try[Unit]] = ???
  17.  
  18.   def runQuery: Future[Graph[ClosedShape, Future[Int]]] = {
  19.     import GraphDSL.Implicits._
  20.    
  21.     ws.url("example.com").stream.map { response: WSResponse =>
  22.       val source = response.bodyAsSource
  23.       val flow1: Flow[ByteString, List[ByteString], NotUsed] = CsvParsing.lineScanner(Comma, DoubleQuote, Backslash)
  24.       val flow2: Flow[List[ByteString], String, NotUsed] = Flow[List[ByteString]].map(_.map(_.utf8String).mkString(" "))
  25.       val flow3: Flow[String, String, NotUsed]           = Flow[String].filter(_.nonEmpty)
  26.       val flow4: Flow[String, Seq[String], NotUsed]      = Flow[String].grouped(100)
  27.       val flow5: Flow[Seq[String], Try[Unit], NotUsed]   = Flow[Seq[String]].mapAsyncUnordered(parallelism = 4)(insertBatch)
  28.       val sink: Sink[Try[_], Future[Int]] = Sink.fold[Int, Try[_]](0) { (i, t) => i + (if (t.isSuccess) 1 else 0)}
  29.  
  30.       GraphDSL.create(sink) { implicit builder => sink =>
  31.         source ~> flow1 ~> flow2 ~> flow3 ~> flow4 ~> flow5 ~> sink
  32.         ClosedShape
  33.       }
  34.     }
  35.   }
  36. }
Add Comment
Please, Sign In to add comment