public List getSplits(JobContext arg0) throws IOException, InterruptedException { ReliableCassandraClient client = new ReliableCassandraClient(new String[]{location.getNode()}, location.getKeyspace()); List tokenRanges = client.describe_ring(); LOG.info("Got " + tokenRanges.size() + " tokenranges:"); for(TokenRange t : tokenRanges) LOG.info(" - " + t); //tokenRanges.add(new TokenRange(new String(new byte[]{0}), tokenRanges.get(0).end_token, tokenRanges.get(0).endpoints)); List splits = new LinkedList(); // Simple non concurrent version: for(TokenRange range : tokenRanges){ LOG.info("Asking tokenrange " + range + " about it's splits."); // Connect to any endpoint and fetch the splits ReliableCassandraClient c = new ReliableCassandraClient(range.endpoints.toArray(new String[0]), location.getKeyspace()); List tokens = null; try { if(range.start_token.length() < 2) continue; //TODO remove hack! // if((new BigInteger(range.end_token)).compareTo(new BigInteger(range.start_token)) < 0) // continue; tokens = c.describe_splits(location.getColumnFamily(), range.start_token, range.end_token, Constansts.MAX_SPLIT_SIZE); } catch (Exception e1) { // TODO Auto-generated catch block throw new IOException("Unable to get split description from " + range.endpoints, e1); } LOG.info("Got " + tokens.size() + " tokens, building tokenranges:"); String[] endpoints = range.endpoints.toArray(new String[]{}); // hadoop needs hostname, not ip for (int i = 0; i < endpoints.length; i++){ endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName(); } for (int i = 1; i < tokens.size(); i++){ CassandraRangeInputSplit split = new CassandraRangeInputSplit(location.keyspace, location.columnFamily, tokens.get(i - 1), tokens.get(i), endpoints); LOG.info(" - " + split); splits.add(split); } } assert splits.size() > 0; Collections.shuffle(splits, new Random(System.nanoTime())); return splits; }