Advertisement
Guest User

Untitled

a guest
Jan 17th, 2017
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.98 KB | None | 0 0
  1. import java.io.InputStream
  2. import java.nio.file.{Files, Path, Paths, StandardOpenOption}
  3.  
  4. import akka.{Done, NotUsed}
  5. import akka.actor.ActorSystem
  6. import akka.stream.{ActorAttributes, ActorMaterializer, IOResult, ThrottleMode}
  7. import akka.stream.scaladsl.{FileIO, Flow, Sink, Source, StreamConverters}
  8. import akka.util.ByteString
  9.  
  10. import scala.concurrent.Future
  11. import scala.concurrent.duration._
  12.  
  13. object Test extends App {
  14.   implicit val system = ActorSystem()
  15.   implicit val materializer = ActorMaterializer()
  16.   import system.dispatcher
  17.  
  18.   var cnt = 0
  19.   val xml = "dupa"
  20.  
  21.   val source: Source[ByteString, NotUsed] =
  22.     Source
  23.       .repeat(xml)
  24.         .map(e => {cnt +=1; e+cnt})
  25.       .take(10)
  26. //      .throttle(10, 1 second, 1, ThrottleMode.shaping)
  27.       .map(ByteString(_))
  28.  
  29.   val sink = Sink.foreach(println)
  30.   val fileSink: Sink[ByteString, Future[Done]] = streamToFiles()
  31.  
  32.   source to fileSink
  33.  
  34.  
  35.   def streamToFiles(sizeLimit: Int = 10, elemLimit: Int = 10, fileBaseName: String = "someFile"): Sink[ByteString, Future[Done]] = {
  36.     import StandardOpenOption._
  37.  
  38.     var sizeCnt, elemCnt, fileCnt: Int = 0
  39.  
  40.     def newPath: Path = {
  41.       fileCnt += 1
  42.  
  43.       if (fileCnt -1 > 0) Paths.get(fileBaseName + fileCnt)
  44.       else Paths.get(fileBaseName)
  45.     }
  46.  
  47.     var currPath: Path = newPath
  48.  
  49.     println(s"= create new file: $currPath")
  50.  
  51.     Sink.foreach[ByteString]{ e =>
  52.       val eSize = e.length
  53.       val cntsZeroed = sizeCnt == 0 && elemCnt == 0
  54.       val underLimits = ( sizeCnt + eSize <= sizeLimit ) && ( elemCnt + 1 <= elemLimit)
  55.  
  56.       if (underLimits) {
  57.         sizeCnt += eSize; elemCnt += 1
  58.  
  59.         println("write to file")
  60.         Files.write(currPath, e.toArray, CREATE, WRITE, APPEND)
  61.       } else {
  62.         sizeCnt = eSize; elemCnt = 1
  63.  
  64.         currPath = newPath
  65.         println(s"= create new file: $currPath")
  66.         println("write to file")
  67.         Files.write(currPath, e.toArray, CREATE, WRITE, APPEND)
  68.       }
  69.     }
  70.   }
  71. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement