Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class Uploader(userToken: UUID, hostURI: URI) extends GraphStage[FlowShape[DataChunk, UploadResult]] {
- val out: Outlet[UploadResult] = Outlet("Uploader.out")
- val in: Inlet[DataChunk] = Inlet("Uploader.in")
- override val shape: FlowShape[DataChunk, UploadResult] = FlowShape(in,out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
- new GraphStageLogic(shape) {
- private var uploadStream: ZstdOutputStream = null
- private var checkedStream: SHA256ChecksummedOutputStream = null
- private var totalBytes = 0l
- private val outputBuffer = Queue.empty[UploadResult]
- override def preStart() = pull(in)
- override def postStop() = if(uploadStream != null) {
- println("cleaning up")
- uploadStream.close
- }
- private def initStreams(d: Data) = {
- checkedStream = new SHA256ChecksummedOutputStream(new WebOutputStream(userToken, d.location, host = hostURI))
- uploadStream = new ZstdOutputStream(new BufferedOutputStream(checkedStream, 1024*256))
- }
- private def uploadChunk(d: DataChunk) = {
- d match {
- case Chunk(byteString, data) =>
- if(checkedStream == null) {
- initStreams(data)
- }
- try {
- uploadStream.write(byteString.toArray)
- totalBytes += byteString.size
- } catch {
- case e: Throwable => failStage(e)
- }
- push(out, UploadedBytes(byteString.size, data))
- case FinishChunk(data) =>
- uploadStream.close()
- checkedStream.getChecksum.foreach{ cs =>
- push(out,UploadComplete(data, totalBytes, cs))
- }
- totalBytes = 0
- uploadStream = null
- checkedStream = null
- }
- }
- setHandler(in, new InHandler{
- override def onPush() = {
- val c = grab(in)
- uploadChunk(c)
- }
- override def onUpstreamFinish() = {
- completeStage()
- }
- })
- setHandler(out, new OutHandler{
- override def onPull() = {
- println("uploader pulled!")
- if(!hasBeenPulled(in))
- pull(in)
- }
- override def onDownstreamFinish() = {
- completeStage()
- }
- })
- }
- }
- }
- class Reader extends Accessor[RawData] {
- def retries = 5
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
- new GraphStageLogic(shape) {
- private var data: RawData = null
- private var fileReader: FileInputStream = null
- private val bucket = Array.fill[Byte](1024*1024)(0)
- private var dataMap = initializeDataMap
- // override def preStart(): Unit = pull(in)
- override def postStop() = if(fileReader != null) {
- println("cleaning up")
- fileReader.close
- }
- setHandler(in, new InHandler{
- override def onPush(): Unit = {
- println("reader pushed!")
- val d = grab(in)
- dataMap = incrementDataMap(d, dataMap).fold(e => {failStage(e); Map.empty}, identity)
- data = d
- fileReader = new FileInputStream(d.file)
- emitChunk()
- }
- override def onUpstreamFinish() = {
- if(data == null) {
- completeStage()
- }
- }
- })
- setHandler(out, new OutHandler{
- override def onPull(): Unit = {
- if(data == null) {
- pull(in)
- } else {
- emitChunk()
- }
- }
- })
- private def emitChunk(): Unit = {
- val len = fileReader.read(bucket)
- if(len != -1) {
- push(out, Chunk(ByteString.fromArray(bucket, 0, len), data))
- } else {
- push(out, FinishChunk(data))
- data = null
- fileReader.close()
- if(isClosed(in)) {
- completeStage()
- }
- }
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement