public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException, InterruptedException { rows = new HashMap>(); this.currentSplit = (CassandraRangeInputSplit) split; LOG.info("Starting retrieving split " + currentSplit); try { SlicePredicate slicePredicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[] {}); sliceRange.setFinish(new byte[] {}); slicePredicate.setSlice_range(sliceRange); progress = 0; // TODO issue multiple queries to allow for really large splits KeyRange keyRange = new KeyRange(Constansts.MAX_SPLIT_SIZE) .setStart_token(currentSplit.getStartToken()) .setEnd_token(currentSplit.getEndToken()); ReliableCassandraClient client = new ReliableCassandraClient(currentSplit.getLocations(), currentSplit.getKeyspace()); List slices = client.get_range_slices(new ColumnParent(currentSplit.getColumnFamily()), slicePredicate, keyRange, ConsistencyLevel.ONE); for(KeySlice k : slices){ rows.put(k.key, k.columns); } keyIterator = rows.keySet().iterator(); LOG.error("Got " + rows.size() + " rows."); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); throw e; } //} }