public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
ReliableCassandraClient client = new ReliableCassandraClient(new String[]{location.getNode()}, location.getKeyspace());
List<TokenRange> tokenRanges = client.describe_ring();
LOG.info("Got " + tokenRanges.size() + " tokenranges:");
for(TokenRange t : tokenRanges)
LOG.info(" - " + t);
//tokenRanges.add(new TokenRange(new String(new byte[]{0}), tokenRanges.get(0).end_token, tokenRanges.get(0).endpoints));
List<InputSplit> splits = new LinkedList<InputSplit>();
// Simple non concurrent version:
for(TokenRange range : tokenRanges){
LOG.info("Asking tokenrange " + range + " about it's splits.");
// Connect to any endpoint and fetch the splits
ReliableCassandraClient c = new ReliableCassandraClient(range.endpoints.toArray(new String[0]), location.getKeyspace());
List<String> tokens = null;
try {
if(range.start_token.length() < 2)
continue;
//TODO remove hack!
// if((new BigInteger(range.end_token)).compareTo(new BigInteger(range.start_token)) < 0)
// continue;
tokens = c.describe_splits(location.getColumnFamily(), range.start_token, range.end_token, Constansts.MAX_SPLIT_SIZE);
} catch (Exception e1) {
// TODO Auto-generated catch block
throw new IOException("Unable to get split description from " + range.endpoints, e1);
}
LOG.info("Got " + tokens.size() + " tokens, building tokenranges:");
String[] endpoints = range.endpoints.toArray(new String[]{});
// hadoop needs hostname, not ip
for (int i = 0; i < endpoints.length; i++){
endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
}
for (int i = 1; i < tokens.size(); i++){
CassandraRangeInputSplit split = new CassandraRangeInputSplit(location.keyspace, location.columnFamily, tokens.get(i - 1), tokens.get(i), endpoints);
LOG.info(" - " + split);
splits.add(split);
}
}
assert splits.size() > 0;
Collections.shuffle(splits, new Random(System.nanoTime()));
return splits;
}