Guest User

Untitled

a guest
Feb 22nd, 2019
180
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.52 KB | None | 0 0
  1. import org.apache.kafka.streams.KafkaStreams;
  2. import org.apache.kafka.streams.StreamsBuilder;
  3. import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
  4. import org.apache.kafka.streams.processor.ThreadMetadata;
  5. import org.apache.kafka.test.MockMetricsReporter;
  6. import org.apache.kafka.test.TestUtils;
  7. import org.junit.*;
  8.  
  9. import java.util.Properties;
  10. import java.util.Set;
  11.  
  12. public class KafkaStreamsDeadThreadTest {
  13.         @ClassRule
  14.         public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
  15.         private final StreamsBuilder builder = new StreamsBuilder();
  16.         private KafkaStreams globalStreams;
  17.         private Properties props;
  18.  
  19.         @Before
  20.         public void before() {
  21.                 builder.stream("not-exist-topic");
  22.  
  23.                 this.props = new Properties();
  24.                 this.props.put("application.id", "appId");
  25.                 this.props.put("bootstrap.servers", CLUSTER.bootstrapServers());
  26.                 this.props.put("metric.reporters", MockMetricsReporter.class.getName());
  27.                 this.props.put("state.dir", TestUtils.tempDirectory().getPath());
  28.                 this.props.put("num.stream.threads", 2);
  29.                 this.props.put("internal.leave.group.on.close", true);
  30.                 this.globalStreams = new KafkaStreams(this.builder.build(), this.props);
  31.         }
  32.  
  33.         @After
  34.         public void cleanup() {
  35.                 if (this.globalStreams != null) {
  36.                         this.globalStreams.close();
  37.                 }
  38.         }
  39.  
  40.         @Test
  41.         public void testErrorStateChanges() throws InterruptedException {
  42.                 this.globalStreams.start();
  43.                 TestUtils.waitForCondition(() -> {
  44.                         return this.globalStreams.state() == KafkaStreams.State.RUNNING;
  45.                 }, 10000L, "Streams never started.");
  46.  
  47.                 TestUtils.waitForCondition(() -> {
  48.                      return allThreadsInState(this.globalStreams.localThreadsMetadata(), "DEAD");
  49.                 }, 10000L, "Streams threads not changed to error state.");
  50.  
  51.                 Assert.assertEquals(KafkaStreams.State.ERROR, this.globalStreams.state());
  52.         }
  53.  
  54.         private boolean allThreadsInState(Set<ThreadMetadata> threads, String state){
  55.                 boolean result = true;
  56.                 for (ThreadMetadata thread:threads){
  57.                         if (!thread.threadState().equals(state)) result=false;
  58.                 }
  59.                 return result;
  60.         }
  61. }
Advertisement
Add Comment
Please, Sign In to add comment