Advertisement
Guest User

Untitled

a guest
Aug 18th, 2019
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.76 KB | None | 0 0
  1. package helpers
  2.  
  3. import kafka.server.KafkaConfig
  4. import kafka.server.KafkaServerStartable
  5. import org.apache.zookeeper.server.ServerConfig
  6. import org.apache.zookeeper.server.ZooKeeperServerMain
  7. import org.apache.zookeeper.server.quorum.QuorumPeerConfig
  8. import java.net.InetSocketAddress
  9. import java.nio.file.Files
  10. import java.util.*
  11.  
  12. fun main() {
  13. TestableKafkaServer()
  14. }
  15.  
  16. class TestableKafkaServer : AutoCloseable {
  17.  
  18. private val quorumConfiguration = object : QuorumPeerConfig() {
  19. override fun getDataDir(): String = Files.createTempDirectory("zookeeper").toString()
  20. override fun getDataLogDir(): String = Files.createTempDirectory("zookeeper-logs").toString()
  21. override fun getClientPortAddress(): InetSocketAddress = InetSocketAddress(2181)
  22. }
  23.  
  24. class StoppableZooKeeperServerMain : ZooKeeperServerMain() {
  25. fun stop(): Unit = shutdown()
  26. }
  27.  
  28. val zooKeeperServer = StoppableZooKeeperServerMain()
  29.  
  30. val zooKeeperConfig = ServerConfig()
  31.  
  32. private val zooKeeperThread = object : Thread() {
  33. override fun run(): Unit = zooKeeperServer.runFromConfig(zooKeeperConfig)
  34. }
  35.  
  36. private var kafka: KafkaServerStartable? = null
  37.  
  38. init {
  39. zooKeeperConfig.readFrom(quorumConfiguration)
  40. zooKeeperThread.start()
  41.  
  42. val kafkaProperties = Properties()
  43. kafkaProperties["zookeeper.connect"] = "localhost:2181"
  44. kafkaProperties["broker.id"] = "1"
  45. kafkaProperties["log.dirs"] = Files.createTempDirectory("kafka-logs").toString()
  46.  
  47. val kafkaConfig = KafkaConfig.fromProps(kafkaProperties)
  48.  
  49. kafka = KafkaServerStartable(kafkaConfig)
  50. }
  51.  
  52. fun startKafkaBroker() {
  53. kafka?.startup()
  54. }
  55.  
  56. override fun close() {
  57. kafka?.shutdown()
  58. zooKeeperServer.stop()
  59. }
  60. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement