Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.StreamsBuilder;
- import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
- import org.apache.kafka.streams.processor.ThreadMetadata;
- import org.apache.kafka.test.MockMetricsReporter;
- import org.apache.kafka.test.TestUtils;
- import org.junit.*;
- import java.util.Properties;
- import java.util.Set;
- public class KafkaStreamsDeadThreadTest {
- @ClassRule
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
- private final StreamsBuilder builder = new StreamsBuilder();
- private KafkaStreams globalStreams;
- private Properties props;
- @Before
- public void before() {
- builder.stream("not-exist-topic");
- this.props = new Properties();
- this.props.put("application.id", "appId");
- this.props.put("bootstrap.servers", CLUSTER.bootstrapServers());
- this.props.put("metric.reporters", MockMetricsReporter.class.getName());
- this.props.put("state.dir", TestUtils.tempDirectory().getPath());
- this.props.put("num.stream.threads", 2);
- this.props.put("internal.leave.group.on.close", true);
- this.globalStreams = new KafkaStreams(this.builder.build(), this.props);
- }
- @After
- public void cleanup() {
- if (this.globalStreams != null) {
- this.globalStreams.close();
- }
- }
- @Test
- public void testErrorStateChanges() throws InterruptedException {
- this.globalStreams.start();
- TestUtils.waitForCondition(() -> {
- return this.globalStreams.state() == KafkaStreams.State.RUNNING;
- }, 10000L, "Streams never started.");
- TestUtils.waitForCondition(() -> {
- return allThreadsInState(this.globalStreams.localThreadsMetadata(), "DEAD");
- }, 10000L, "Streams threads not changed to error state.");
- Assert.assertEquals(KafkaStreams.State.ERROR, this.globalStreams.state());
- }
- private boolean allThreadsInState(Set<ThreadMetadata> threads, String state){
- boolean result = true;
- for (ThreadMetadata thread:threads){
- if (!thread.threadState().equals(state)) result=false;
- }
- return result;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment