Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package io.confluent.examples.streams.kafka;
- import static org.junit.Assert.*;
- import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
- import java.util.Properties;
- import kafka.server.KafkaConfig;
- import org.junit.AfterClass;
- import org.junit.ClassRule;
- import org.junit.Test;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class EmbeddedSingleNodeKafkaClusterTest {
- private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaClusterTest.class);
- @ClassRule
- public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(
- new Properties() {
- {
- //Transactions need durability so the defaults require multiple nodes.
- //For testing purposes set transactions to work with a single kafka broker.
- put(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1");
- put(KafkaConfig.TransactionsTopicMinISRProp(), "1");
- put(KafkaConfig.TransactionsTopicPartitionsProp(), "1");
- put(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG, "60000");
- }
- });
- @AfterClass
- public static void stopCluster() {
- log.info("stopping cluster");
- if (CLUSTER.isRunning()) {
- CLUSTER.stop();
- }
- }
- @Test
- public void deleteTopics() throws InterruptedException {
- final String topicName = "topic";
- log.info("Creating topic {}", topicName);
- CLUSTER.createTopic(topicName);
- log.info("Successfully created topic. About to delete.");
- final boolean success = CLUSTER.deleteTopicsAndWait(30000, topicName);
- log.info("Successful in deleting topic: {}", success);
- assertTrue(success);
- log.info("Waiting 60 seconds");
- Thread.sleep(60000);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement