Pastebin launched a little side project called VERYVIRAL.com, check it out ;-) Want more features on Pastebin? Sign Up, it's FREE!
Guest

Untitled

By: a guest on Dec 11th, 2010  |  syntax: Java  |  size: 2.15 KB  |  views: 53  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
This paste has a previous version, view the difference. Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1.                 public List<InputSplit> getSplits(JobContext arg0) throws IOException,
  2.                 InterruptedException {
  3.                         ReliableCassandraClient client = new ReliableCassandraClient(new String[]{location.getNode()}, location.getKeyspace());
  4.                                 List<TokenRange> tokenRanges = client.describe_ring();
  5.                                 LOG.info("Got " + tokenRanges.size() + " tokenranges:");
  6.                                 for(TokenRange t : tokenRanges)
  7.                                         LOG.info(" - " + t);
  8.                                
  9.                                 //tokenRanges.add(new TokenRange(new String(new byte[]{0}), tokenRanges.get(0).end_token, tokenRanges.get(0).endpoints));
  10.                                 List<InputSplit> splits = new LinkedList<InputSplit>();
  11.  
  12.                                 // Simple non concurrent version:
  13.                                 for(TokenRange range : tokenRanges){
  14.                                         LOG.info("Asking tokenrange " + range + " about it's splits.");
  15.                                         // Connect to any endpoint and fetch the splits
  16.                                         ReliableCassandraClient c = new ReliableCassandraClient(range.endpoints.toArray(new String[0]), location.getKeyspace());
  17.                                         List<String> tokens = null;
  18.                                         try {
  19.                                                 if(range.start_token.length() < 2)
  20.                                                         continue;
  21.                                                 //TODO remove hack!
  22. //                                              if((new BigInteger(range.end_token)).compareTo(new BigInteger(range.start_token)) < 0)
  23. //                                                      continue;
  24.                                                 tokens = c.describe_splits(location.getColumnFamily(), range.start_token, range.end_token, Constansts.MAX_SPLIT_SIZE);
  25.                                         } catch (Exception e1) {
  26.                                                 // TODO Auto-generated catch block
  27.                                                 throw new IOException("Unable to get split description from " + range.endpoints, e1);
  28.                                         }
  29.                                         LOG.info("Got " + tokens.size() + " tokens, building tokenranges:");
  30.                                         String[] endpoints = range.endpoints.toArray(new String[]{});
  31.                                         // hadoop needs hostname, not ip
  32.                                         for (int i = 0; i < endpoints.length; i++){
  33.                                                 endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
  34.                                         }
  35.                                         for (int i = 1; i < tokens.size(); i++){
  36.                                                 CassandraRangeInputSplit split = new CassandraRangeInputSplit(location.keyspace, location.columnFamily, tokens.get(i - 1), tokens.get(i), endpoints);
  37.                                                 LOG.info(" - " + split);
  38.                                                 splits.add(split);
  39.                                         }
  40.                                 }
  41.  
  42.                                 assert splits.size() > 0;
  43.                                 Collections.shuffle(splits, new Random(System.nanoTime()));
  44.                                 return splits;
  45.                 }