public void initialize(InputSplit split, TaskAttemptContext arg1)
throws IOException, InterruptedException {
rows = new HashMap<ByteBuffer, List<ColumnOrSuperColumn>>();
this.currentSplit = (CassandraRangeInputSplit) split;
LOG.info("Starting retrieving split " + currentSplit);
try {
SlicePredicate slicePredicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[] {});
sliceRange.setFinish(new byte[] {});
slicePredicate.setSlice_range(sliceRange);
progress = 0;
// TODO issue multiple queries to allow for really large splits
KeyRange keyRange = new KeyRange(Constansts.MAX_SPLIT_SIZE)
.setStart_token(currentSplit.getStartToken())
.setEnd_token(currentSplit.getEndToken());
ReliableCassandraClient client = new ReliableCassandraClient(currentSplit.getLocations(), currentSplit.getKeyspace());
List<KeySlice> slices = client.get_range_slices(new ColumnParent(currentSplit.getColumnFamily()), slicePredicate, keyRange, ConsistencyLevel.ONE);
for(KeySlice k : slices){
rows.put(k.key, k.columns);
}
keyIterator = rows.keySet().iterator();
LOG.error("Got " + rows.size() + " rows.");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw e;
}
//}
}