Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
- index b0bf427..1602607 100644
- --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
- +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
- @@ -87,6 +87,10 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
- {
- + // only need to connect once
- + if (socket != null && socket.isOpen())
- + return;
- +
- this.split = (ColumnFamilySplit) split;
- Configuration conf = context.getConfiguration();
- predicate = ConfigHelper.getInputSlicePredicate(conf);
- @@ -98,33 +102,38 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
- keyspace = ConfigHelper.getInputKeyspace(conf);
- - try
- + List<String> locations = getLocations();
- + for(String location : locations)
- {
- - // only need to connect once
- - if (socket != null && socket.isOpen())
- - return;
- -
- - // create connection using thrift
- - String location = getLocation();
- - socket = new TSocket(location, ConfigHelper.getRpcPort(conf));
- - TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
- - client = new Cassandra.Client(binaryProtocol);
- - socket.open();
- + try
- + {
- + // create connection using thrift
- + socket = new TSocket(location, ConfigHelper.getRpcPort(conf));
- + TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
- + client = new Cassandra.Client(binaryProtocol);
- + socket.open();
- - // log in
- - client.set_keyspace(keyspace);
- - if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
- + // log in
- + client.set_keyspace(keyspace);
- + if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
- + {
- + Map<String, String> creds = new HashMap<String, String>();
- + creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
- + creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
- + AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- + client.login(authRequest);
- + }
- + break;
- + }
- + catch (Exception e)
- {
- - Map<String, String> creds = new HashMap<String, String>();
- - creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
- - creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
- - AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- - client.login(authRequest);
- + //pass
- }
- }
- - catch (Exception e)
- +
- + if (socket == null || !socket.isOpen())
- {
- - throw new RuntimeException(e);
- + throw new RuntimeException("No location available! Tried: "+locations);
- }
- iter = new RowIterator();
- @@ -140,7 +149,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
- // we don't use endpointsnitch since we are trying to support hadoop nodes that are
- // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least.
- - private String getLocation()
- + private List<String> getLocations()
- {
- InetAddress[] localAddresses;
- try
- @@ -151,6 +160,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
- {
- throw new AssertionError(e);
- }
- +
- + List<String> result = new ArrayList<String>();
- +
- for (InetAddress address : localAddresses)
- {
- for (String location : split.getLocations())
- @@ -166,11 +178,15 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
- }
- if (address.equals(locationAddress))
- {
- - return location;
- + result.add(0, location);
- + }
- + else
- + {
- + result.add(location);
- }
- }
- }
- - return split.getLocations()[0];
- + return result;
- }
- private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
Advertisement
Add Comment
Please, Sign In to add comment