mody

cassnadra-hadoop-CFRR.patch

Aug 17th, 2011
158
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 4.92 KB | None | 0 0
  1. diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
  2. index b0bf427..1602607 100644
  3. --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
  4. +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
  5. @@ -87,6 +87,10 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
  6.      
  7.      public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
  8.      {
  9. +        // only need to connect once
  10. +        if (socket != null && socket.isOpen())
  11. +            return;
  12. +
  13.          this.split = (ColumnFamilySplit) split;
  14.          Configuration conf = context.getConfiguration();
  15.          predicate = ConfigHelper.getInputSlicePredicate(conf);
  16. @@ -98,33 +102,38 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
  17.          
  18.          keyspace = ConfigHelper.getInputKeyspace(conf);
  19.          
  20. -        try
  21. +        List<String> locations = getLocations();
  22. +        for(String location : locations)
  23.          {
  24. -            // only need to connect once
  25. -            if (socket != null && socket.isOpen())
  26. -                return;
  27. -
  28. -            // create connection using thrift
  29. -            String location = getLocation();
  30. -            socket = new TSocket(location, ConfigHelper.getRpcPort(conf));
  31. -            TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
  32. -            client = new Cassandra.Client(binaryProtocol);
  33. -            socket.open();
  34. +            try
  35. +            {
  36. +                // create connection using thrift
  37. +                socket = new TSocket(location, ConfigHelper.getRpcPort(conf));
  38. +                TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
  39. +                client = new Cassandra.Client(binaryProtocol);
  40. +                socket.open();
  41.  
  42. -            // log in
  43. -            client.set_keyspace(keyspace);
  44. -            if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
  45. +                // log in
  46. +                client.set_keyspace(keyspace);
  47. +                if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
  48. +                {
  49. +                    Map<String, String> creds = new HashMap<String, String>();
  50. +                    creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
  51. +                    creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
  52. +                    AuthenticationRequest authRequest = new AuthenticationRequest(creds);
  53. +                    client.login(authRequest);
  54. +                }
  55. +                break;
  56. +            }
  57. +            catch (Exception e)
  58.              {
  59. -                Map<String, String> creds = new HashMap<String, String>();
  60. -                creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
  61. -                creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
  62. -                AuthenticationRequest authRequest = new AuthenticationRequest(creds);
  63. -                client.login(authRequest);
  64. +                //pass
  65.              }
  66.          }
  67. -        catch (Exception e)
  68. +
  69. +        if (socket == null || !socket.isOpen())
  70.          {
  71. -            throw new RuntimeException(e);
  72. +            throw new RuntimeException("No location available! Tried: "+locations);
  73.          }
  74.  
  75.          iter = new RowIterator();
  76. @@ -140,7 +149,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
  77.  
  78.      // we don't use endpointsnitch since we are trying to support hadoop nodes that are
  79.      // not necessarily on Cassandra machines, too.  This should be adequate for single-DC clusters, at least.
  80. -    private String getLocation()
  81. +    private List<String> getLocations()
  82.      {
  83.          InetAddress[] localAddresses;
  84.          try
  85. @@ -151,6 +160,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
  86.          {
  87.              throw new AssertionError(e);
  88.          }
  89. +
  90. +        List<String> result = new ArrayList<String>();
  91. +
  92.          for (InetAddress address : localAddresses)
  93.          {
  94.              for (String location : split.getLocations())
  95. @@ -166,11 +178,15 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
  96.                  }
  97.                  if (address.equals(locationAddress))
  98.                  {
  99. -                    return location;
  100. +                    result.add(0, location);
  101. +                }
  102. +                else
  103. +                {
  104. +                    result.add(location);
  105.                  }
  106.              }
  107.          }
  108. -        return split.getLocations()[0];
  109. +        return result;
  110.      }
  111.  
  112.      private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
Advertisement
Add Comment
Please, Sign In to add comment