Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import javax.inject._
- import akka.NotUsed
- import akka.stream.alpakka.csv.scaladsl.CsvParsing
- import akka.stream.alpakka.csv.scaladsl.CsvParsing._
- import akka.stream.scaladsl.{Flow, GraphDSL, Sink}
- import akka.stream.{ClosedShape, Graph}
- import akka.util.ByteString
- import play.api.libs.ws.{WSClient, WSResponse}
- import scala.concurrent.{ExecutionContext, Future}
- import scala.util.Try
- @Singleton
- class A @Inject()(ws: WSClient)(implicit ec: ExecutionContext) {
- def insertBatch(s: Seq[String]): Future[Try[Unit]] = ???
- def runQuery: Future[Graph[ClosedShape, Future[Int]]] = {
- import GraphDSL.Implicits._
- ws.url("example.com").stream.map { response: WSResponse =>
- val source = response.bodyAsSource
- val flow1: Flow[ByteString, List[ByteString], NotUsed] = CsvParsing.lineScanner(Comma, DoubleQuote, Backslash)
- val flow2: Flow[List[ByteString], String, NotUsed] = Flow[List[ByteString]].map(_.map(_.utf8String).mkString(" "))
- val flow3: Flow[String, String, NotUsed] = Flow[String].filter(_.nonEmpty)
- val flow4: Flow[String, Seq[String], NotUsed] = Flow[String].grouped(100)
- val flow5: Flow[Seq[String], Try[Unit], NotUsed] = Flow[Seq[String]].mapAsyncUnordered(parallelism = 4)(insertBatch)
- val sink: Sink[Try[_], Future[Int]] = Sink.fold[Int, Try[_]](0) { (i, t) => i + (if (t.isSuccess) 1 else 0)}
- GraphDSL.create(sink) { implicit builder => sink =>
- source ~> flow1 ~> flow2 ~> flow3 ~> flow4 ~> flow5 ~> sink
- ClosedShape
- }
- }
- }
- }
Add Comment
Please, Sign In to add comment