1.     public void initialize(InputSplit split, TaskAttemptContext arg1)
  2.     throws IOException, InterruptedException {
  3.         rows = new HashMap<ByteBuffer, List<ColumnOrSuperColumn>>();
  4.         this.currentSplit = (CassandraRangeInputSplit) split;
  5.         LOG.info("Starting retrieving split " + currentSplit);
  6.         try {
  7.             SlicePredicate slicePredicate = new SlicePredicate();
  8.             SliceRange sliceRange = new SliceRange();
  9.             sliceRange.setStart(new byte[] {});
  10.             sliceRange.setFinish(new byte[] {});
  11.             slicePredicate.setSlice_range(sliceRange);
  12.             progress = 0;
  13.             // TODO issue multiple queries to allow for really large splits
  14.             KeyRange keyRange = new KeyRange(Constansts.MAX_SPLIT_SIZE)
  15.                 .setStart_token(currentSplit.getStartToken())
  16.                 .setEnd_token(currentSplit.getEndToken());
  17.             ReliableCassandraClient client = new ReliableCassandraClient(currentSplit.getLocations(), currentSplit.getKeyspace());
  18.             List<KeySlice> slices = client.get_range_slices(new ColumnParent(currentSplit.getColumnFamily()), slicePredicate, keyRange, ConsistencyLevel.ONE);
  19.             for(KeySlice k : slices){
  20.                 rows.put(k.key, k.columns);
  21.             }
  22.             keyIterator = rows.keySet().iterator();
  23.             LOG.error("Got " + rows.size() + " rows.");
  24.         } catch (IOException e) {
  25.             // TODO Auto-generated catch block
  26.             e.printStackTrace();
  27.             throw e;
  28.         }
  29.         //}
  30.     }