Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- object KryoReadStage {
- def flow[T](kryoSupport: KryoSupport,
- `class`: Class[T],
- serializer: Serializer[_]): Flow[ByteString, immutable.Seq[T], NotUsed] =
- Flow.fromGraph(new KryoReadStage[T](kryoSupport, `class`, serializer))
- }
- final class KryoReadStage[T](kryoSupport: KryoSupport,
- `class`: Class[T],
- serializer: Serializer[_])
- extends GraphStage[FlowShape[ByteString, immutable.Seq[T]]] {
- override def shape: FlowShape[ByteString, immutable.Seq[T]] = FlowShape.of(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
- new GraphStageLogic(shape) {
- setHandler(in, new InHandler {
- override def onPush(): Unit = {
- val bytes =
- if (previousBytes.length == 0) grab(in)
- else ByteString.fromArrayUnsafe(previousBytes) ++ grab(in)
- Managed(new Input(new ByteBufferBackedInputStream(bytes.asByteBuffer))) { input =>
- var position = 0
- val acc = ListBuffer[T]()
- kryoSupport.withKryo { kryo =>
- var last = false
- while (!last && !input.eof()) {
- val tOpt = tryRead(kryo, input)
- tOpt match {
- case Some(t) =>
- acc += t
- position = input.total().toInt
- previousBytes = EmptyArray
- case None =>
- val bytesLeft = new Array[Byte](bytes.length - position)
- val bb = bytes.asByteBuffer
- bb.position(position)
- bb.get(bytesLeft)
- last = true
- previousBytes = bytesLeft
- }
- }
- push(out, acc.toList)
- }
- }
- }
- private def tryRead(kryo: Kryo, input: Input): Option[T] =
- try {
- Some(kryo.readObject(input, `class`, serializer))
- } catch {
- case _: KryoException => None
- }
- })
- setHandler(out, new OutHandler {
- override def onPull(): Unit = {
- pull(in)
- }
- })
- private val EmptyArray: Array[Byte] = Array.empty
- private var previousBytes: Array[Byte] = EmptyArray
- }
- }
- override def toString: String = "KryoReadStage"
- private lazy val in: Inlet[ByteString] = Inlet("KryoReadStage.in")
- private lazy val out: Outlet[immutable.Seq[T]] = Outlet("KryoReadStage.out")
- }
- class ByteBufferBackedInputStream(buf: ByteBuffer) extends InputStream {
- override def read: Int = {
- if (!buf.hasRemaining) -1
- else buf.get & 0xFF
- }
- override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
- if (!buf.hasRemaining) -1
- else {
- val read = Math.min(len, buf.remaining)
- buf.get(bytes, off, read)
- read
- }
- }
- }
- object Managed {
- type AutoCloseableView[T] = T => AutoCloseable
- def apply[T: AutoCloseableView, V](resource: T)(op: T => V): V =
- try {
- op(resource)
- } finally {
- resource.close()
- }
- }
- trait KryoSupport {
- def withKryo[T](f: Kryo => T): T
- }
- class PooledKryoSupport(serializers: (Class[_], Serializer[_])*) extends KryoSupport {
- override def withKryo[T](f: Kryo => T): T = {
- pool.run(new KryoCallback[T] {
- override def execute(kryo: Kryo): T = f(kryo)
- })
- }
- private val pool = {
- val factory = new KryoFactory() {
- override def create(): Kryo = {
- val kryo = new Kryo
- (KryoSupport.ScalaSerializers ++ serializers).foreach {
- case ((clazz, serializer)) =>
- kryo.register(clazz, serializer)
- }
- kryo
- }
- }
- new KryoPool.Builder(factory).softReferences().build()
- }
- }
Add Comment
Please, Sign In to add comment