Advertisement
mody

cassnadra-hadoop-CFIF2.patch

Aug 18th, 2011
205
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 3.27 KB | None | 0 0
  1. diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
  2. index 97b6753..0a7e6a7 100644
  3. --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
  4. +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
  5. @@ -151,6 +151,10 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
  6.          {
  7.              ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
  8.              List<String> tokens = getSubSplits(keyspace, cfName, range, conf);
  9. +            if(tokens.size() == 0)
  10. +            {
  11. +                return splits;
  12. +            }
  13.  
  14.              // turn the sub-ranges into InputSplits
  15.              String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
  16. @@ -173,6 +177,8 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
  17.      private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
  18.      {
  19.          int splitsize = ConfigHelper.getInputSplitSize(conf);
  20. +        boolean ignore = ConfigHelper.getIgnoreUnavailableRange(conf);
  21. +
  22.          for (String host : range.endpoints)
  23.          {
  24.              try
  25. @@ -194,7 +200,16 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
  26.                  throw new RuntimeException(e);
  27.              }
  28.          }
  29. -        throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
  30. +        if(ignore)
  31. +        {
  32. +            logger.info("failed connecting to all endpoints " +
  33. +                StringUtils.join(range.endpoints, ",") + "; ignored");
  34. +            return new ArrayList<String>();
  35. +        }
  36. +        else
  37. +        {
  38. +            throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
  39. +        }
  40.      }
  41.  
  42.      private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
  43. diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
  44. index 0478ac7..03ae01a 100644
  45. --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
  46. +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
  47. @@ -51,6 +51,7 @@ public class ConfigHelper
  48.      private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
  49.      private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
  50.      private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
  51. +    private static final String IGNORE_UNAVAILABLE_RANGE_CONFIG = "cassandra.range.ignore";
  52.  
  53.      /**
  54.       * Set the keyspace and column family for the input of this job.
  55. @@ -281,4 +282,14 @@ public class ConfigHelper
  56.              throw new RuntimeException(e);
  57.          }
  58.      }
  59. +
  60. +    public static void setIgnoreUnavailableRange(Configuration conf, boolean flag)
  61. +    {
  62. +        conf.setBoolean(IGNORE_UNAVAILABLE_RANGE_CONFIG, flag);
  63. +    }
  64. +
  65. +    public static boolean getIgnoreUnavailableRange(Configuration conf)
  66. +    {
  67. +        return conf.getBoolean(IGNORE_UNAVAILABLE_RANGE_CONFIG, false);
  68. +    }
  69.  }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement