View difference between Paste ID: dsdMGZjW and
SHOW:
|
|
- or go back to the newest paste.
1 | - | |
1 | + | public List<InputSplit> getSplits(JobContext arg0) throws IOException, |
2 | InterruptedException { | |
3 | ReliableCassandraClient client = new ReliableCassandraClient(new String[]{location.getNode()}, location.getKeyspace()); | |
4 | List<TokenRange> tokenRanges = client.describe_ring(); | |
5 | LOG.info("Got " + tokenRanges.size() + " tokenranges:"); | |
6 | for(TokenRange t : tokenRanges) | |
7 | LOG.info(" - " + t); | |
8 | ||
9 | //tokenRanges.add(new TokenRange(new String(new byte[]{0}), tokenRanges.get(0).end_token, tokenRanges.get(0).endpoints)); | |
10 | List<InputSplit> splits = new LinkedList<InputSplit>(); | |
11 | ||
12 | // Simple non concurrent version: | |
13 | for(TokenRange range : tokenRanges){ | |
14 | LOG.info("Asking tokenrange " + range + " about it's splits."); | |
15 | // Connect to any endpoint and fetch the splits | |
16 | ReliableCassandraClient c = new ReliableCassandraClient(range.endpoints.toArray(new String[0]), location.getKeyspace()); | |
17 | List<String> tokens = null; | |
18 | try { | |
19 | if(range.start_token.length() < 2) | |
20 | continue; | |
21 | //TODO remove hack! | |
22 | // if((new BigInteger(range.end_token)).compareTo(new BigInteger(range.start_token)) < 0) | |
23 | // continue; | |
24 | tokens = c.describe_splits(location.getColumnFamily(), range.start_token, range.end_token, Constansts.MAX_SPLIT_SIZE); | |
25 | } catch (Exception e1) { | |
26 | // TODO Auto-generated catch block | |
27 | throw new IOException("Unable to get split description from " + range.endpoints, e1); | |
28 | } | |
29 | LOG.info("Got " + tokens.size() + " tokens, building tokenranges:"); | |
30 | String[] endpoints = range.endpoints.toArray(new String[]{}); | |
31 | // hadoop needs hostname, not ip | |
32 | for (int i = 0; i < endpoints.length; i++){ | |
33 | endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName(); | |
34 | } | |
35 | for (int i = 1; i < tokens.size(); i++){ | |
36 | CassandraRangeInputSplit split = new CassandraRangeInputSplit(location.keyspace, location.columnFamily, tokens.get(i - 1), tokens.get(i), endpoints); | |
37 | LOG.info(" - " + split); | |
38 | splits.add(split); | |
39 | } | |
40 | } | |
41 | ||
42 | assert splits.size() > 0; | |
43 | Collections.shuffle(splits, new Random(System.nanoTime())); | |
44 | return splits; | |
45 | } |