Advertisement
Guest User

Untitled

a guest
Mar 21st, 2018
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.97 KB | None | 0 0
  1. class Uploader(userToken: UUID, hostURI: URI) extends GraphStage[FlowShape[DataChunk, UploadResult]] {
  2.   val out: Outlet[UploadResult] = Outlet("Uploader.out")
  3.   val in: Inlet[DataChunk] = Inlet("Uploader.in")
  4.  
  5.   override val shape: FlowShape[DataChunk, UploadResult] = FlowShape(in,out)
  6.  
  7.  
  8.   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
  9.     new GraphStageLogic(shape) {
  10.       private var uploadStream: ZstdOutputStream = null
  11.       private var checkedStream: SHA256ChecksummedOutputStream = null
  12.       private var totalBytes = 0l
  13.       private val outputBuffer = Queue.empty[UploadResult]
  14.  
  15.       override def preStart() = pull(in)
  16.       override def postStop() = if(uploadStream != null) {
  17.         println("cleaning up")
  18.         uploadStream.close
  19.       }
  20.  
  21.       private def initStreams(d: Data) = {
  22.         checkedStream = new SHA256ChecksummedOutputStream(new WebOutputStream(userToken, d.location, host = hostURI))
  23.         uploadStream = new ZstdOutputStream(new BufferedOutputStream(checkedStream, 1024*256))
  24.       }
  25.  
  26.       private def uploadChunk(d: DataChunk) = {
  27.         d match {
  28.           case Chunk(byteString, data) =>
  29.             if(checkedStream == null) {
  30.               initStreams(data)
  31.             }
  32.             try {
  33.               uploadStream.write(byteString.toArray)
  34.               totalBytes += byteString.size
  35.             } catch {
  36.               case e: Throwable => failStage(e)
  37.             }
  38.             push(out, UploadedBytes(byteString.size, data))
  39.           case FinishChunk(data) =>
  40.             uploadStream.close()
  41.             checkedStream.getChecksum.foreach{ cs =>
  42.               push(out,UploadComplete(data, totalBytes, cs))
  43.             }
  44.             totalBytes = 0
  45.             uploadStream = null
  46.             checkedStream = null
  47.         }
  48.       }
  49.  
  50.       setHandler(in, new InHandler{
  51.         override def onPush() = {
  52.           val c = grab(in)
  53.           uploadChunk(c)
  54.         }
  55.  
  56.         override def onUpstreamFinish() = {
  57.           completeStage()
  58.         }
  59.       })
  60.  
  61.       setHandler(out, new OutHandler{
  62.         override def onPull() = {
  63.           println("uploader pulled!")
  64.           if(!hasBeenPulled(in))
  65.             pull(in)
  66.         }
  67.  
  68.         override def onDownstreamFinish() = {
  69.           completeStage()
  70.         }
  71.       })
  72.     }
  73.   }
  74. }
  75.  
  76. class Reader extends Accessor[RawData] {
  77.  
  78.   def retries = 5
  79.  
  80.   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
  81.     new GraphStageLogic(shape) {
  82.       private var data: RawData = null
  83.       private var fileReader: FileInputStream = null
  84.  
  85.       private val bucket = Array.fill[Byte](1024*1024)(0)
  86.  
  87.       private var dataMap = initializeDataMap
  88.  
  89.  
  90.       // override def preStart(): Unit = pull(in)
  91.       override def postStop() = if(fileReader != null) {
  92.         println("cleaning up")
  93.         fileReader.close
  94.       }
  95.  
  96.       setHandler(in, new InHandler{
  97.         override def onPush(): Unit = {
  98.           println("reader pushed!")
  99.           val d = grab(in)
  100.           dataMap = incrementDataMap(d, dataMap).fold(e => {failStage(e); Map.empty}, identity)
  101.           data = d
  102.           fileReader = new FileInputStream(d.file)
  103.           emitChunk()
  104.         }
  105.  
  106.         override def onUpstreamFinish() = {
  107.           if(data == null) {
  108.             completeStage()
  109.           }
  110.         }
  111.       })
  112.  
  113.       setHandler(out, new OutHandler{
  114.         override def onPull(): Unit = {
  115.           if(data == null) {
  116.             pull(in)
  117.           } else {
  118.             emitChunk()
  119.           }
  120.         }
  121.       })
  122.  
  123.       private def emitChunk(): Unit = {
  124.         val len = fileReader.read(bucket)
  125.         if(len != -1)  {
  126.           push(out, Chunk(ByteString.fromArray(bucket, 0, len), data))
  127.         } else {
  128.           push(out, FinishChunk(data))
  129.           data = null
  130.           fileReader.close()
  131.           if(isClosed(in)) {
  132.             completeStage()
  133.           }
  134.         }
  135.       }
  136.     }
  137.   }
  138. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement