Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- MemoryRecords partitionRecords = (MemoryRecords) partitionData.records();
- for (RecordBatch batch : partitionRecords.batches()) {
- Buffer batchBuffer = Buffer.buffer();
- Iterator<org.apache.kafka.common.record.Record> it = batch.iterator();
- while (it.hasNext()) {
- org.apache.kafka.common.record.Record record = it.next();
- String k = "";
- String v = "";
- for (Header header : record.headers()) {
- v = new String(header.value());
- // Some logic with k and v to write to a Buffer
- }
- if (record.hasKey()) {
- ByteBuffer keyBuffer = record.key();
- ByteBuffer valueBuffer = record.value();
- if (record.hasValue()) {
- k = new String(keyBuffer.array(), keyBuffer.position(), record.keySize());
- v = new String(valueBuffer.array(), valueBuffer.position(), record.valueSize());
- // Some logic with k and v to write to a Buffer
- } else {
- k = new String(keyBuffer.array(), keyBuffer.position(), record.keySize());
- // Some logic with k and v to write to a Buffer
- }
- } else {
- // Empty buffer
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement