Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package sandbox
- import jetbrains.exodus.io.Block
- import jetbrains.exodus.io.DataReader
- import jetbrains.exodus.io.FileDataReader
- import jetbrains.exodus.io.RemoveBlockType
- import jetbrains.exodus.log.Log
- import jetbrains.exodus.log.LogUtil
- import mu.KLogging
- import software.amazon.awssdk.core.AwsRequestOverrideConfig
- import software.amazon.awssdk.core.async.AsyncResponseHandler
- import software.amazon.awssdk.services.s3.S3AsyncClient
- import software.amazon.awssdk.services.s3.model.GetObjectRequest
- import software.amazon.awssdk.services.s3.model.GetObjectResponse
- import software.amazon.awssdk.services.s3.model.ListObjectsRequest
- import software.amazon.awssdk.services.s3.model.S3Object
- import java.util.concurrent.atomic.AtomicLong
- class S3DataReader(val s3: S3AsyncClient, val bucketName: String, val requestOverrideConfig: AwsRequestOverrideConfig? = null) : DataReader {
- companion object : KLogging()
- val bytesRead = AtomicLong()
- private val blocks: Map<Long, S3Block> = s3.listObjects(ListObjectsRequest.builder()
- .requestOverrideConfig(requestOverrideConfig)
- .bucket(bucketName)
- .delimiter("/")
- //.prefix("My")
- .build()).get().contents().asSequence().filter {
- it.key().endsWith(".xd")
- }.map {
- S3Block(LogUtil.getAddress(it.key()), it)
- }.associateBy {
- it._address
- }
- @Suppress("UNCHECKED_CAST")
- override fun getBlocks(): Array<Block> {
- val files = blocks
- val size = files.size
- val result = arrayOfNulls<Block>(size)
- var i = 0
- blocks.forEach {
- result[i++] = it.value
- }
- FileDataReader.sortBlocks(result)
- return result as Array<Block>
- }
- override fun setLog(log: Log) {
- }
- override fun getLocation(): String = "s3:$bucketName"
- override fun removeBlock(blockAddress: Long, rbt: RemoveBlockType) {
- // TODO
- }
- override fun truncateBlock(blockAddress: Long, length: Long) {
- TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
- }
- override fun close() {
- // TODO
- }
- override fun getBlock(address: Long): Block {
- logger.info { "Get block at ${LogUtil.getLogFilename(address)}" }
- return blocks[address]!!
- }
- override fun clear() {
- TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
- }
- private inner class S3Block(val _address: Long, val s3Object: S3Object) : Block {
- override fun getAddress() = _address
- override fun setReadOnly(): Boolean {
- return true
- }
- override fun length(): Long = s3Object.size()
- override fun read(output: ByteArray, position: Long, count: Int): Int {
- if (count <= 0) {
- return 0
- }
- val range = "bytes=$position-${position + count - 1}"
- logger.debug { "Request range: $range" }
- val written = s3.getObject(
- GetObjectRequest.builder().range(range)
- .requestOverrideConfig(requestOverrideConfig).bucket(bucketName).key(s3Object.key()).build(),
- ByteArrayAsyncResponseHandler(output)
- ).get()
- if (written < count) {
- logger.debug { "Read underflow: expected $count, got $written" }
- }
- bytesRead.addAndGet(written.toLong())
- return written
- }
- }
- }
Add Comment
Please, Sign In to add comment