- object FetchOffsetDetail {
- def readFrom(buffer: ByteBuffer): FetchOffsetDetail = {
- val topic = Utils.readShortString(buffer, "UTF-8")
- val partitionsCount = buffer.getShort
- val partitions = new Array[Int](partitionsCount)
- for (i <- 0 until partitions.length)
- partitions(i) = buffer.getInt
- val offsetsCount = buffer.getShort
- val offsets = new Array[Long](offsetsCount)
- for (i <- 0 until offsets.length)
- offsets(i) = buffer.getLong
- new FetchOffsetDetail(topic, partitions, offsets)
- }
- }
- class FetchOffsetDetail(val topic: String, val partitions: Array[Int], val offsets: Array[Long]) {
- def writeTo(buffer: ByteBuffer) {
- // topic
- Utils.writeShortString(buffer, topic, "UTF-8")
- // partitions
- if(partitions.size > Short.MaxValue)
- throw new IllegalArgumentException("Number of partitions in FetchRequest exceeds " + Short.MaxValue + ".")
- buffer.putShort(partitions.length.shortValue())
- partitions.foreach(buffer.putInt(_))
- // offsets
- if(offsets.size > Short.MaxValue)
- throw new IllegalArgumentException("Number of offsets in FetchRequest exceeds " + Short.MaxValue + ".")
- buffer.putShort(offsets.length.shortValue())
- offsets.foreach(buffer.putLong(_))
- }
- def sizeInBytes(): Int = 2 + topic.length() + partitions.foldLeft(2)( (s, _) => s + 4 ) + offsets.foldLeft(2)( (s, _) => s + 8)
- override def toString(): String = "FetchOffsetDetail(topic:" + topic + ", parts:" + partitions + ", offsets:" + offsets + ")"
- }
- object FetchRequest {
- def CURRENT_VERSION = 1.shortValue()
- def readFrom(buffer: ByteBuffer): FetchRequest = {
- val correlationId = buffer.getInt
- val versionId = buffer.getShort
- val clientId = Utils.readShortString(buffer, "UTF-8")
- val replicaId = buffer.getInt
- val maxWait = buffer.getInt
- val minBytes = buffer.getInt
- val offsetsCount = buffer.getShort
- val offsetInfo = new Array[FetchOffsetDetail](offsetsCount)
- for(i <- 0 until offsetInfo.length)
- offsetInfo(i) = FetchOffsetDetail.readFrom(buffer)
- new FetchRequest(correlationId, versionId, clientId, replicaId, maxWait, minBytes, offsetInfo)
- }
- }
- class FetchRequest(val correlationId: Int,
- val versionId: Short,
- val clientId: String,
- val replicaId: Int,
- val maxWait: Int,
- val minBytes: Int,
- val offsetInfo: Array[FetchOffsetDetail]) extends Request(RequestKeys.Fetch) {
- def writeTo(buffer: ByteBuffer) {
- buffer.putInt(correlationId)
- buffer.putShort(versionId)
- Utils.writeShortString(buffer, clientId, "UTF-8")
- buffer.putInt(replicaId)
- buffer.putInt(maxWait)
- buffer.putInt(minBytes)
- for(topicDetail <- offsetInfo) {
- topicDetail.writeTo(buffer)
- }
- }
- def sizeInBytes(): Int = 4 + 2 + (2 + clientId.length()) + 4 + 4 + 4 + (2 + offsetInfo.foldLeft(0)(_ + _.sizeInBytes()))
- override def toString(): String = {
- val buffer = new StringBuilder()
- buffer.append("FetchRequest(correlationId: ").append(correlationId)
- .append(", versionId: ").append(versionId)
- .append(", clientId: ").append(clientId)
- .append(", replicaId: ").append(replicaId)
- .append(", maxWait: ").append(maxWait)
- .append(", minBytes: ").append(minBytes)
- .append(", offsetInfo: ").append(offsetInfo)
- buffer.toString()
- }
- }