Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
- index 97b6753..0a7e6a7 100644
- --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
- +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
- @@ -151,6 +151,10 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
- {
- ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
- List<String> tokens = getSubSplits(keyspace, cfName, range, conf);
- + if(tokens.size() == 0)
- + {
- + return splits;
- + }
- // turn the sub-ranges into InputSplits
- String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
- @@ -173,6 +177,8 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
- private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
- {
- int splitsize = ConfigHelper.getInputSplitSize(conf);
- + boolean ignore = ConfigHelper.getIgnoreUnavailableRange(conf);
- +
- for (String host : range.endpoints)
- {
- try
- @@ -194,7 +200,16 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
- throw new RuntimeException(e);
- }
- }
- - throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
- + if(ignore)
- + {
- + logger.info("failed connecting to all endpoints " +
- + StringUtils.join(range.endpoints, ",") + "; ignored");
- + return new ArrayList<String>();
- + }
- + else
- + {
- + throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
- + }
- }
- private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
- index 0478ac7..03ae01a 100644
- --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
- +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
- @@ -51,6 +51,7 @@ public class ConfigHelper
- private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
- private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
- private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
- + private static final String IGNORE_UNAVAILABLE_RANGE_CONFIG = "cassandra.range.ignore";
- /**
- * Set the keyspace and column family for the input of this job.
- @@ -281,4 +282,14 @@ public class ConfigHelper
- throw new RuntimeException(e);
- }
- }
- +
- + public static void setIgnoreUnavailableRange(Configuration conf, boolean flag)
- + {
- + conf.setBoolean(IGNORE_UNAVAILABLE_RANGE_CONFIG, flag);
- + }
- +
- + public static boolean getIgnoreUnavailableRange(Configuration conf)
- + {
- + return conf.getBoolean(IGNORE_UNAVAILABLE_RANGE_CONFIG, false);
- + }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement