Guest User

Untitled

a guest
Dec 10th, 2016
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.06 KB | None | 0 0
  1. import kafka.server.KafkaConfig;
  2. import kafka.server.KafkaServer;
  3.  
  4. import java.io.File;
  5. import java.io.FileNotFoundException;
  6. import java.util.ArrayList;
  7. import java.util.Collections;
  8. import java.util.List;
  9. import java.util.Properties;
  10.  
  11. public class EmbeddedKafkaCluster {
  12. private final List<Integer> ports;
  13. private final String zkConnection;
  14. private final Properties baseProperties;
  15.  
  16. private final String brokerList;
  17.  
  18. private final List<KafkaServer> brokers;
  19. private final List<File> logDirs;
  20.  
  21. public EmbeddedKafkaCluster(String zkConnection) {
  22. this(zkConnection, new Properties());
  23. }
  24.  
  25. public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties) {
  26. this(zkConnection, baseProperties, Collections.singletonList(-1));
  27. }
  28.  
  29. public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties, List<Integer> ports) {
  30. this.zkConnection = zkConnection;
  31. this.ports = resolvePorts(ports);
  32. this.baseProperties = baseProperties;
  33.  
  34. this.brokers = new ArrayList<KafkaServer>();
  35. this.logDirs = new ArrayList<File>();
  36.  
  37. this.brokerList = constructBrokerList(this.ports);
  38. }
  39.  
  40. private List<Integer> resolvePorts(List<Integer> ports) {
  41. List<Integer> resolvedPorts = new ArrayList<Integer>();
  42. for (Integer port : ports) {
  43. resolvedPorts.add(resolvePort(port));
  44. }
  45. return resolvedPorts;
  46. }
  47.  
  48. private int resolvePort(int port) {
  49. if (port == -1) {
  50. return TestUtils.getAvailablePort();
  51. }
  52. return port;
  53. }
  54.  
  55. private String constructBrokerList(List<Integer> ports) {
  56. StringBuilder sb = new StringBuilder();
  57. for (Integer port : ports) {
  58. if (sb.length() > 0) {
  59. sb.append(",");
  60. }
  61. sb.append("localhost:").append(port);
  62. }
  63. return sb.toString();
  64. }
  65.  
  66. public void startup() {
  67. for (int i = 0; i < ports.size(); i++) {
  68. Integer port = ports.get(i);
  69. File logDir = TestUtils.constructTempDir("kafka-local");
  70.  
  71. Properties properties = new Properties();
  72. properties.putAll(baseProperties);
  73. properties.setProperty("zookeeper.connect", zkConnection);
  74. properties.setProperty("broker.id", String.valueOf(i + 1));
  75. properties.setProperty("host.name", "localhost");
  76. properties.setProperty("port", Integer.toString(port));
  77. properties.setProperty("log.dir", logDir.getAbsolutePath());
  78. properties.setProperty("log.flush.interval.messages", String.valueOf(1));
  79.  
  80. KafkaServer broker = startBroker(properties);
  81.  
  82. brokers.add(broker);
  83. logDirs.add(logDir);
  84. }
  85. }
  86.  
  87.  
  88. private KafkaServer startBroker(Properties props) {
  89. KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime());
  90. server.startup();
  91. return server;
  92. }
  93.  
  94. public Properties getProps() {
  95. Properties props = new Properties();
  96. props.putAll(baseProperties);
  97. props.put("metadata.broker.list", brokerList);
  98. props.put("zookeeper.connect", zkConnection);
  99. return props;
  100. }
  101.  
  102. public String getBrokerList() {
  103. return brokerList;
  104. }
  105.  
  106. public List<Integer> getPorts() {
  107. return ports;
  108. }
  109.  
  110. public String getZkConnection() {
  111. return zkConnection;
  112. }
  113.  
  114. public void shutdown() {
  115. for (KafkaServer broker : brokers) {
  116. try {
  117. broker.shutdown();
  118. } catch (Exception e) {
  119. e.printStackTrace();
  120. }
  121. }
  122. for (File logDir : logDirs) {
  123. try {
  124. TestUtils.deleteFile(logDir);
  125. } catch (FileNotFoundException e) {
  126. e.printStackTrace();
  127. }
  128. }
  129. }
  130.  
  131. @Override
  132. public String toString() {
  133. final StringBuilder sb = new StringBuilder("EmbeddedKafkaCluster{");
  134. sb.append("brokerList='").append(brokerList).append('\'');
  135. sb.append('}');
  136. return sb.toString();
  137. }
  138. }
Add Comment
Please, Sign In to add comment