Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Jun 30th, 2012  |  syntax: None  |  size: 3.34 KB  |  hits: 11  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. object FetchOffsetDetail {
  2.   def readFrom(buffer: ByteBuffer): FetchOffsetDetail = {
  3.     val topic = Utils.readShortString(buffer, "UTF-8")
  4.     val partitionsCount = buffer.getShort
  5.     val partitions = new Array[Int](partitionsCount)
  6.     for (i <- 0 until partitions.length)
  7.       partitions(i) = buffer.getInt
  8.     val offsetsCount = buffer.getShort
  9.     val offsets = new Array[Long](offsetsCount)
  10.     for (i <- 0 until offsets.length)
  11.       offsets(i) = buffer.getLong
  12.  
  13.     new FetchOffsetDetail(topic, partitions, offsets)
  14.   }
  15. }
  16.  
  17. class FetchOffsetDetail(val topic: String, val partitions: Array[Int], val offsets: Array[Long]) {
  18.  
  19.   def writeTo(buffer: ByteBuffer) {
  20.     // topic
  21.     Utils.writeShortString(buffer, topic, "UTF-8")
  22.  
  23.     // partitions
  24.     if(partitions.size > Short.MaxValue)
  25.       throw new IllegalArgumentException("Number of partitions in FetchRequest exceeds " + Short.MaxValue + ".")
  26.     buffer.putShort(partitions.length.shortValue())
  27.     partitions.foreach(buffer.putInt(_))
  28.  
  29.     // offsets
  30.     if(offsets.size > Short.MaxValue)
  31.       throw new IllegalArgumentException("Number of offsets in FetchRequest exceeds " + Short.MaxValue + ".")
  32.     buffer.putShort(offsets.length.shortValue())
  33.     offsets.foreach(buffer.putLong(_))
  34.   }
  35.  
  36.   def sizeInBytes(): Int = 2 + topic.length() + partitions.foldLeft(2)( (s, _) => s + 4 ) + offsets.foldLeft(2)( (s, _) => s + 8)
  37.  
  38.   override def toString(): String = "FetchOffsetDetail(topic:" + topic + ", parts:" + partitions + ", offsets:" + offsets + ")"
  39. }
  40.  
  41. object FetchRequest {
  42.   def CURRENT_VERSION = 1.shortValue()
  43.  
  44.   def readFrom(buffer: ByteBuffer): FetchRequest = {
  45.     val correlationId = buffer.getInt
  46.     val versionId = buffer.getShort
  47.     val clientId = Utils.readShortString(buffer, "UTF-8")
  48.     val replicaId = buffer.getInt
  49.     val maxWait = buffer.getInt
  50.     val minBytes = buffer.getInt
  51.     val offsetsCount = buffer.getShort
  52.     val offsetInfo = new Array[FetchOffsetDetail](offsetsCount)
  53.     for(i <- 0 until offsetInfo.length)
  54.       offsetInfo(i) = FetchOffsetDetail.readFrom(buffer)
  55.  
  56.     new FetchRequest(correlationId, versionId, clientId, replicaId, maxWait, minBytes, offsetInfo)
  57.   }
  58.  
  59. }
  60.  
  61. class FetchRequest(val correlationId: Int,
  62.                    val versionId: Short,
  63.                    val clientId: String,
  64.                    val replicaId: Int,
  65.                    val maxWait: Int,
  66.                    val minBytes: Int,
  67.                    val offsetInfo: Array[FetchOffsetDetail]) extends Request(RequestKeys.Fetch) {
  68.  
  69.   def writeTo(buffer: ByteBuffer) {
  70.     buffer.putInt(correlationId)
  71.     buffer.putShort(versionId)
  72.     Utils.writeShortString(buffer, clientId, "UTF-8")
  73.     buffer.putInt(replicaId)
  74.     buffer.putInt(maxWait)
  75.     buffer.putInt(minBytes)
  76.     for(topicDetail <- offsetInfo) {
  77.       topicDetail.writeTo(buffer)
  78.     }
  79.   }
  80.  
  81.   def sizeInBytes(): Int = 4 + 2 + (2 + clientId.length()) + 4 + 4 + 4 + (2 + offsetInfo.foldLeft(0)(_ + _.sizeInBytes()))
  82.  
  83.   override def toString(): String = {
  84.     val buffer = new StringBuilder()
  85.     buffer.append("FetchRequest(correlationId: ").append(correlationId)
  86.       .append(", versionId: ").append(versionId)
  87.       .append(", clientId: ").append(clientId)
  88.       .append(", replicaId: ").append(replicaId)
  89.       .append(", maxWait: ").append(maxWait)
  90.       .append(", minBytes: ").append(minBytes)
  91.       .append(", offsetInfo: ").append(offsetInfo)
  92.     buffer.toString()
  93.   }
  94. }