Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public void initialize(InputSplit split, TaskAttemptContext arg1)
- throws IOException, InterruptedException {
- rows = new HashMap<ByteBuffer, List<ColumnOrSuperColumn>>();
- this.currentSplit = (CassandraRangeInputSplit) split;
- LOG.info("Starting retrieving split " + currentSplit);
- try {
- SlicePredicate slicePredicate = new SlicePredicate();
- SliceRange sliceRange = new SliceRange();
- sliceRange.setStart(new byte[] {});
- sliceRange.setFinish(new byte[] {});
- slicePredicate.setSlice_range(sliceRange);
- progress = 0;
- // TODO issue multiple queries to allow for really large splits
- KeyRange keyRange = new KeyRange(Constansts.MAX_SPLIT_SIZE)
- .setStart_token(currentSplit.getStartToken())
- .setEnd_token(currentSplit.getEndToken());
- ReliableCassandraClient client = new ReliableCassandraClient(currentSplit.getLocations(), currentSplit.getKeyspace());
- List<KeySlice> slices = client.get_range_slices(new ColumnParent(currentSplit.getColumnFamily()), slicePredicate, keyRange, ConsistencyLevel.ONE);
- for(KeySlice k : slices){
- rows.put(k.key, k.columns);
- }
- keyIterator = rows.keySet().iterator();
- LOG.error("Got " + rows.size() + " rows.");
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- throw e;
- }
- //}
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement