Guest User

Untitled

a guest
Nov 22nd, 2017
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.72 KB | None | 0 0
  1. object KryoReadStage {
  2. def flow[T](kryoSupport: KryoSupport,
  3. `class`: Class[T],
  4. serializer: Serializer[_]): Flow[ByteString, immutable.Seq[T], NotUsed] =
  5. Flow.fromGraph(new KryoReadStage[T](kryoSupport, `class`, serializer))
  6. }
  7.  
  8. final class KryoReadStage[T](kryoSupport: KryoSupport,
  9. `class`: Class[T],
  10. serializer: Serializer[_])
  11. extends GraphStage[FlowShape[ByteString, immutable.Seq[T]]] {
  12.  
  13. override def shape: FlowShape[ByteString, immutable.Seq[T]] = FlowShape.of(in, out)
  14.  
  15. override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
  16. new GraphStageLogic(shape) {
  17.  
  18. setHandler(in, new InHandler {
  19.  
  20. override def onPush(): Unit = {
  21. val bytes =
  22. if (previousBytes.length == 0) grab(in)
  23. else ByteString.fromArrayUnsafe(previousBytes) ++ grab(in)
  24.  
  25. Managed(new Input(new ByteBufferBackedInputStream(bytes.asByteBuffer))) { input =>
  26. var position = 0
  27. val acc = ListBuffer[T]()
  28.  
  29. kryoSupport.withKryo { kryo =>
  30. var last = false
  31.  
  32. while (!last && !input.eof()) {
  33. val tOpt = tryRead(kryo, input)
  34.  
  35. tOpt match {
  36. case Some(t) =>
  37. acc += t
  38. position = input.total().toInt
  39. previousBytes = EmptyArray
  40. case None =>
  41. val bytesLeft = new Array[Byte](bytes.length - position)
  42.  
  43. val bb = bytes.asByteBuffer
  44. bb.position(position)
  45. bb.get(bytesLeft)
  46.  
  47. last = true
  48. previousBytes = bytesLeft
  49. }
  50. }
  51.  
  52. push(out, acc.toList)
  53. }
  54. }
  55. }
  56.  
  57. private def tryRead(kryo: Kryo, input: Input): Option[T] =
  58. try {
  59. Some(kryo.readObject(input, `class`, serializer))
  60. } catch {
  61. case _: KryoException => None
  62. }
  63.  
  64. })
  65.  
  66. setHandler(out, new OutHandler {
  67. override def onPull(): Unit = {
  68. pull(in)
  69. }
  70. })
  71.  
  72. private val EmptyArray: Array[Byte] = Array.empty
  73.  
  74. private var previousBytes: Array[Byte] = EmptyArray
  75.  
  76. }
  77. }
  78.  
  79. override def toString: String = "KryoReadStage"
  80.  
  81. private lazy val in: Inlet[ByteString] = Inlet("KryoReadStage.in")
  82. private lazy val out: Outlet[immutable.Seq[T]] = Outlet("KryoReadStage.out")
  83.  
  84. }
  85.  
  86. class ByteBufferBackedInputStream(buf: ByteBuffer) extends InputStream {
  87.  
  88. override def read: Int = {
  89. if (!buf.hasRemaining) -1
  90. else buf.get & 0xFF
  91. }
  92.  
  93. override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
  94. if (!buf.hasRemaining) -1
  95. else {
  96. val read = Math.min(len, buf.remaining)
  97. buf.get(bytes, off, read)
  98. read
  99. }
  100. }
  101.  
  102. }
  103.  
  104. object Managed {
  105.  
  106. type AutoCloseableView[T] = T => AutoCloseable
  107.  
  108. def apply[T: AutoCloseableView, V](resource: T)(op: T => V): V =
  109. try {
  110. op(resource)
  111. } finally {
  112. resource.close()
  113. }
  114. }
  115.  
  116. trait KryoSupport {
  117. def withKryo[T](f: Kryo => T): T
  118. }
  119.  
  120. class PooledKryoSupport(serializers: (Class[_], Serializer[_])*) extends KryoSupport {
  121.  
  122. override def withKryo[T](f: Kryo => T): T = {
  123. pool.run(new KryoCallback[T] {
  124. override def execute(kryo: Kryo): T = f(kryo)
  125. })
  126. }
  127.  
  128. private val pool = {
  129. val factory = new KryoFactory() {
  130. override def create(): Kryo = {
  131. val kryo = new Kryo
  132.  
  133. (KryoSupport.ScalaSerializers ++ serializers).foreach {
  134. case ((clazz, serializer)) =>
  135. kryo.register(clazz, serializer)
  136. }
  137.  
  138. kryo
  139. }
  140. }
  141.  
  142. new KryoPool.Builder(factory).softReferences().build()
  143. }
  144.  
  145. }
Add Comment
Please, Sign In to add comment