Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package helpers
- import kafka.server.KafkaConfig
- import kafka.server.KafkaServerStartable
- import org.apache.zookeeper.server.ServerConfig
- import org.apache.zookeeper.server.ZooKeeperServerMain
- import org.apache.zookeeper.server.quorum.QuorumPeerConfig
- import java.net.InetSocketAddress
- import java.nio.file.Files
- import java.util.*
- fun main() {
- TestableKafkaServer()
- }
- class TestableKafkaServer : AutoCloseable {
- private val quorumConfiguration = object : QuorumPeerConfig() {
- override fun getDataDir(): String = Files.createTempDirectory("zookeeper").toString()
- override fun getDataLogDir(): String = Files.createTempDirectory("zookeeper-logs").toString()
- override fun getClientPortAddress(): InetSocketAddress = InetSocketAddress(2181)
- }
- class StoppableZooKeeperServerMain : ZooKeeperServerMain() {
- fun stop(): Unit = shutdown()
- }
- val zooKeeperServer = StoppableZooKeeperServerMain()
- val zooKeeperConfig = ServerConfig()
- private val zooKeeperThread = object : Thread() {
- override fun run(): Unit = zooKeeperServer.runFromConfig(zooKeeperConfig)
- }
- private var kafka: KafkaServerStartable? = null
- init {
- zooKeeperConfig.readFrom(quorumConfiguration)
- zooKeeperThread.start()
- val kafkaProperties = Properties()
- kafkaProperties["zookeeper.connect"] = "localhost:2181"
- kafkaProperties["broker.id"] = "1"
- kafkaProperties["log.dirs"] = Files.createTempDirectory("kafka-logs").toString()
- val kafkaConfig = KafkaConfig.fromProps(kafkaProperties)
- kafka = KafkaServerStartable(kafkaConfig)
- }
- fun startKafkaBroker() {
- kafka?.startup()
- }
- override fun close() {
- kafka?.shutdown()
- zooKeeperServer.stop()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement