Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public List<InputSplit> getSplits(JobContext arg0) throws IOException,
- InterruptedException {
- ReliableCassandraClient client = new ReliableCassandraClient(new String[]{location.getNode()}, location.getKeyspace());
- List<TokenRange> tokenRanges = client.describe_ring();
- LOG.info("Got " + tokenRanges.size() + " tokenranges:");
- for(TokenRange t : tokenRanges)
- LOG.info(" - " + t);
- //tokenRanges.add(new TokenRange(new String(new byte[]{0}), tokenRanges.get(0).end_token, tokenRanges.get(0).endpoints));
- List<InputSplit> splits = new LinkedList<InputSplit>();
- // Simple non concurrent version:
- for(TokenRange range : tokenRanges){
- LOG.info("Asking tokenrange " + range + " about it's splits.");
- // Connect to any endpoint and fetch the splits
- ReliableCassandraClient c = new ReliableCassandraClient(range.endpoints.toArray(new String[0]), location.getKeyspace());
- List<String> tokens = null;
- try {
- if(range.start_token.length() < 2)
- continue;
- //TODO remove hack!
- // if((new BigInteger(range.end_token)).compareTo(new BigInteger(range.start_token)) < 0)
- // continue;
- tokens = c.describe_splits(location.getColumnFamily(), range.start_token, range.end_token, Constansts.MAX_SPLIT_SIZE);
- } catch (Exception e1) {
- // TODO Auto-generated catch block
- throw new IOException("Unable to get split description from " + range.endpoints, e1);
- }
- LOG.info("Got " + tokens.size() + " tokens, building tokenranges:");
- String[] endpoints = range.endpoints.toArray(new String[]{});
- // hadoop needs hostname, not ip
- for (int i = 0; i < endpoints.length; i++){
- endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
- }
- for (int i = 1; i < tokens.size(); i++){
- CassandraRangeInputSplit split = new CassandraRangeInputSplit(location.keyspace, location.columnFamily, tokens.get(i - 1), tokens.get(i), endpoints);
- LOG.info(" - " + split);
- splits.add(split);
- }
- }
- assert splits.size() > 0;
- Collections.shuffle(splits, new Random(System.nanoTime()));
- return splits;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement