diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 97b6753..0a7e6a7 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -151,6 +151,10 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
{
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
List<String> tokens = getSubSplits(keyspace, cfName, range, conf);
+ if(tokens.size() == 0)
+ {
+ return splits;
+ }
// turn the sub-ranges into InputSplits
String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
@@ -173,6 +177,8 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
{
int splitsize = ConfigHelper.getInputSplitSize(conf);
+ boolean ignore = ConfigHelper.getIgnoreUnavailableRange(conf);
+
for (String host : range.endpoints)
{
try
@@ -194,7 +200,16 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
throw new RuntimeException(e);
}
}
- throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
+ if(ignore)
+ {
+ logger.info("failed connecting to all endpoints " +
+ StringUtils.join(range.endpoints, ",") + "; ignored");
+ return new ArrayList<String>();
+ }
+ else
+ {
+ throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
+ }
}
private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 0478ac7..03ae01a 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -51,6 +51,7 @@ public class ConfigHelper
private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
+ private static final String IGNORE_UNAVAILABLE_RANGE_CONFIG = "cassandra.range.ignore";
/**
* Set the keyspace and column family for the input of this job.
@@ -281,4 +282,14 @@ public class ConfigHelper
throw new RuntimeException(e);
}
}
+
+ public static void setIgnoreUnavailableRange(Configuration conf, boolean flag)
+ {
+ conf.setBoolean(IGNORE_UNAVAILABLE_RANGE_CONFIG, flag);
+ }
+
+ public static boolean getIgnoreUnavailableRange(Configuration conf)
+ {
+ return conf.getBoolean(IGNORE_UNAVAILABLE_RANGE_CONFIG, false);
+ }
}