Advertisement
Guest User

Untitled

a guest
Dec 11th, 2010
146
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.15 KB | None | 0 0
  1.         public List<InputSplit> getSplits(JobContext arg0) throws IOException,
  2.         InterruptedException {
  3.             ReliableCassandraClient client = new ReliableCassandraClient(new String[]{location.getNode()}, location.getKeyspace());
  4.                 List<TokenRange> tokenRanges = client.describe_ring();
  5.                 LOG.info("Got " + tokenRanges.size() + " tokenranges:");
  6.                 for(TokenRange t : tokenRanges)
  7.                     LOG.info(" - " + t);
  8.                
  9.                 //tokenRanges.add(new TokenRange(new String(new byte[]{0}), tokenRanges.get(0).end_token, tokenRanges.get(0).endpoints));
  10.                 List<InputSplit> splits = new LinkedList<InputSplit>();
  11.  
  12.                 // Simple non concurrent version:
  13.                 for(TokenRange range : tokenRanges){
  14.                     LOG.info("Asking tokenrange " + range + " about it's splits.");
  15.                     // Connect to any endpoint and fetch the splits
  16.                     ReliableCassandraClient c = new ReliableCassandraClient(range.endpoints.toArray(new String[0]), location.getKeyspace());
  17.                     List<String> tokens = null;
  18.                     try {
  19.                         if(range.start_token.length() < 2)
  20.                             continue;
  21.                         //TODO remove hack!
  22. //                      if((new BigInteger(range.end_token)).compareTo(new BigInteger(range.start_token)) < 0)
  23. //                          continue;
  24.                         tokens = c.describe_splits(location.getColumnFamily(), range.start_token, range.end_token, Constansts.MAX_SPLIT_SIZE);
  25.                     } catch (Exception e1) {
  26.                         // TODO Auto-generated catch block
  27.                         throw new IOException("Unable to get split description from " + range.endpoints, e1);
  28.                     }
  29.                     LOG.info("Got " + tokens.size() + " tokens, building tokenranges:");
  30.                     String[] endpoints = range.endpoints.toArray(new String[]{});
  31.                     // hadoop needs hostname, not ip
  32.                     for (int i = 0; i < endpoints.length; i++){
  33.                         endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
  34.                     }
  35.                     for (int i = 1; i < tokens.size(); i++){
  36.                         CassandraRangeInputSplit split = new CassandraRangeInputSplit(location.keyspace, location.columnFamily, tokens.get(i - 1), tokens.get(i), endpoints);
  37.                         LOG.info(" - " + split);
  38.                         splits.add(split);
  39.                     }
  40.                 }
  41.  
  42.                 assert splits.size() > 0;
  43.                 Collections.shuffle(splits, new Random(System.nanoTime()));
  44.                 return splits;
  45.         }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement