View difference between Paste ID: dsdMGZjW and
SHOW: | | - or go back to the newest paste.
1-
1+
		public List<InputSplit> getSplits(JobContext arg0) throws IOException,
2
		InterruptedException {
3
			ReliableCassandraClient client = new ReliableCassandraClient(new String[]{location.getNode()}, location.getKeyspace());
4
				List<TokenRange> tokenRanges = client.describe_ring();
5
				LOG.info("Got " + tokenRanges.size() + " tokenranges:");
6
				for(TokenRange t : tokenRanges)
7
					LOG.info(" - " + t);
8
				
9
				//tokenRanges.add(new TokenRange(new String(new byte[]{0}), tokenRanges.get(0).end_token, tokenRanges.get(0).endpoints));
10
				List<InputSplit> splits = new LinkedList<InputSplit>();
11
12
				// Simple non concurrent version:
13
				for(TokenRange range : tokenRanges){
14
					LOG.info("Asking tokenrange " + range + " about it's splits.");
15
					// Connect to any endpoint and fetch the splits
16
					ReliableCassandraClient c = new ReliableCassandraClient(range.endpoints.toArray(new String[0]), location.getKeyspace());
17
					List<String> tokens = null;
18
					try {
19
						if(range.start_token.length() < 2)
20
							continue;
21
						//TODO remove hack!
22
//						if((new BigInteger(range.end_token)).compareTo(new BigInteger(range.start_token)) < 0)
23
//							continue;
24
						tokens = c.describe_splits(location.getColumnFamily(), range.start_token, range.end_token, Constansts.MAX_SPLIT_SIZE);
25
					} catch (Exception e1) {
26
						// TODO Auto-generated catch block
27
						throw new IOException("Unable to get split description from " + range.endpoints, e1);
28
					}
29
					LOG.info("Got " + tokens.size() + " tokens, building tokenranges:");
30
					String[] endpoints = range.endpoints.toArray(new String[]{});
31
					// hadoop needs hostname, not ip
32
					for (int i = 0; i < endpoints.length; i++){
33
						endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
34
					}
35
					for (int i = 1; i < tokens.size(); i++){
36
						CassandraRangeInputSplit split = new CassandraRangeInputSplit(location.keyspace, location.columnFamily, tokens.get(i - 1), tokens.get(i), endpoints);
37
						LOG.info(" - " + split);
38
						splits.add(split);
39
					}
40
				}
41
42
				assert splits.size() > 0;
43
				Collections.shuffle(splits, new Random(System.nanoTime()));
44
				return splits;
45
		}