Guest User

Untitled

a guest
May 20th, 2018
139
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.46 KB | None | 0 0
  1. package sandbox
  2.  
  3. import jetbrains.exodus.io.Block
  4. import jetbrains.exodus.io.DataReader
  5. import jetbrains.exodus.io.FileDataReader
  6. import jetbrains.exodus.io.RemoveBlockType
  7. import jetbrains.exodus.log.Log
  8. import jetbrains.exodus.log.LogUtil
  9. import mu.KLogging
  10. import software.amazon.awssdk.core.AwsRequestOverrideConfig
  11. import software.amazon.awssdk.core.async.AsyncResponseHandler
  12. import software.amazon.awssdk.services.s3.S3AsyncClient
  13. import software.amazon.awssdk.services.s3.model.GetObjectRequest
  14. import software.amazon.awssdk.services.s3.model.GetObjectResponse
  15. import software.amazon.awssdk.services.s3.model.ListObjectsRequest
  16. import software.amazon.awssdk.services.s3.model.S3Object
  17. import java.util.concurrent.atomic.AtomicLong
  18.  
  19. class S3DataReader(val s3: S3AsyncClient, val bucketName: String, val requestOverrideConfig: AwsRequestOverrideConfig? = null) : DataReader {
  20. companion object : KLogging()
  21.  
  22. val bytesRead = AtomicLong()
  23.  
  24. private val blocks: Map<Long, S3Block> = s3.listObjects(ListObjectsRequest.builder()
  25. .requestOverrideConfig(requestOverrideConfig)
  26. .bucket(bucketName)
  27. .delimiter("/")
  28. //.prefix("My")
  29. .build()).get().contents().asSequence().filter {
  30. it.key().endsWith(".xd")
  31. }.map {
  32. S3Block(LogUtil.getAddress(it.key()), it)
  33. }.associateBy {
  34. it._address
  35. }
  36.  
  37. @Suppress("UNCHECKED_CAST")
  38. override fun getBlocks(): Array<Block> {
  39. val files = blocks
  40. val size = files.size
  41. val result = arrayOfNulls<Block>(size)
  42. var i = 0
  43. blocks.forEach {
  44. result[i++] = it.value
  45. }
  46. FileDataReader.sortBlocks(result)
  47. return result as Array<Block>
  48. }
  49.  
  50. override fun setLog(log: Log) {
  51. }
  52.  
  53. override fun getLocation(): String = "s3:$bucketName"
  54.  
  55. override fun removeBlock(blockAddress: Long, rbt: RemoveBlockType) {
  56. // TODO
  57. }
  58.  
  59. override fun truncateBlock(blockAddress: Long, length: Long) {
  60. TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
  61. }
  62.  
  63. override fun close() {
  64. // TODO
  65. }
  66.  
  67. override fun getBlock(address: Long): Block {
  68. logger.info { "Get block at ${LogUtil.getLogFilename(address)}" }
  69. return blocks[address]!!
  70. }
  71.  
  72. override fun clear() {
  73. TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
  74. }
  75.  
  76. private inner class S3Block(val _address: Long, val s3Object: S3Object) : Block {
  77. override fun getAddress() = _address
  78.  
  79. override fun setReadOnly(): Boolean {
  80. return true
  81. }
  82.  
  83. override fun length(): Long = s3Object.size()
  84.  
  85. override fun read(output: ByteArray, position: Long, count: Int): Int {
  86. if (count <= 0) {
  87. return 0
  88. }
  89.  
  90. val range = "bytes=$position-${position + count - 1}"
  91. logger.debug { "Request range: $range" }
  92. val written = s3.getObject(
  93. GetObjectRequest.builder().range(range)
  94. .requestOverrideConfig(requestOverrideConfig).bucket(bucketName).key(s3Object.key()).build(),
  95. ByteArrayAsyncResponseHandler(output)
  96. ).get()
  97.  
  98. if (written < count) {
  99. logger.debug { "Read underflow: expected $count, got $written" }
  100. }
  101.  
  102. bytesRead.addAndGet(written.toLong())
  103.  
  104. return written
  105. }
  106. }
  107. }
Add Comment
Please, Sign In to add comment