Guest User

Untitled

a guest
Jan 21st, 2018
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.11 KB | None | 0 0
  1. package org.apache.flume.kafka;
  2.  
  3. import com.cloudera.flume.core.Event;
  4. import com.cloudera.flume.core.EventImpl;
  5. import com.cloudera.util.Clock;
  6. import kafka.api.FetchRequest;
  7. import kafka.javaapi.consumer.SimpleConsumer;
  8. import kafka.message.Message;
  9. import kafka.server.KafkaConfig;
  10. import kafka.server.KafkaServer;
  11. import kafka.utils.TestUtils;
  12. import kafka.utils.TestZKUtils;
  13. import kafka.utils.Utils;
  14. import kafka.zk.EmbeddedZookeeper;
  15. import org.junit.After;
  16. import org.junit.Before;
  17. import org.junit.Ignore;
  18. import org.junit.Test;
  19.  
  20. import java.util.Iterator;
  21. import java.util.Properties;
  22.  
  23. import static junit.framework.Assert.*;
  24.  
  25. public class TestKafaSink {
  26.  
  27. private EmbeddedZookeeper zkServer;
  28.  
  29. private int port = 9092;
  30. private KafkaServer server;
  31. private SimpleConsumer consumer;
  32. private KafkaSink kafkaSink;
  33.  
  34. @Before
  35. public void before() {
  36. zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect());
  37. Properties props = TestUtils.createBrokerConfig(0, port);
  38. props.setProperty("num.partitions", "2");
  39. props.setProperty("topic.partition.count.map", "test:2");
  40. KafkaConfig config = new KafkaConfig(props);
  41. server = TestUtils.createServer(config);
  42. consumer = new SimpleConsumer("localhost", port, 1000000, 64*1024);
  43. }
  44.  
  45. @Test
  46. public void appendsMessage() throws Exception {
  47. kafkaSink = new KafkaSink(TestZKUtils.zookeeperConnect(), "test");
  48. kafkaSink.open();
  49. Event e = new EventImpl("test1".getBytes(), Clock.unixTime(), Event.Priority.INFO, 0, "localhost");
  50. kafkaSink.append(e);
  51. Thread.sleep(100);
  52. Iterator<Message> messageSet1 = consumer.fetch(new FetchRequest("test", 0, 0, 10000)).iterator();
  53. assertTrue("Message set should have 1 message", messageSet1.hasNext());
  54. assertEquals(new Message("test1".getBytes()), messageSet1.next());
  55. }
  56.  
  57. @Test
  58. public void sampleKeyGoesToCorrectPartition() {
  59. assertEquals(new String("testPartitionKey".getBytes()).hashCode() % 2, 1);
  60. }
  61.  
  62. @Test @Ignore
  63. public void canSendToPartition() throws Exception {
  64. kafkaSink = new KafkaSink(TestZKUtils.zookeeperConnect(), "test");
  65. kafkaSink.open();
  66. Event e = new EventImpl("test1".getBytes(), Clock.unixTime(), Event.Priority.INFO, 0, "localhost");
  67. e.set("kafka.partition.key", "testPartitionKey".getBytes());
  68. kafkaSink.append(e);
  69. Thread.sleep(100);
  70. Iterator<Message> messageSet1 = consumer.fetch(new FetchRequest("test", 1, 0, 10000)).iterator();
  71. assertTrue("Message set should have 1 message", messageSet1.hasNext());
  72. assertEquals(new Message("test1".getBytes()), messageSet1.next());
  73. }
  74.  
  75. @Test(expected = IllegalArgumentException.class)
  76. public void requiresZkConnectionString() {
  77. KafkaSink.builder().create(null, "", "test");
  78. fail();
  79. }
  80.  
  81. @Test(expected = IllegalArgumentException.class)
  82. public void requiresTopic() {
  83. KafkaSink.builder().create(null, "localhost:2181", "");
  84. fail();
  85. }
  86.  
  87. @After
  88. public void after() throws Exception {
  89. if (kafkaSink != null) kafkaSink.close();
  90.  
  91. server.shutdown();
  92. Utils.rm(server.config().logDir());
  93. Utils.rm(server.config().logDir());
  94. Thread.sleep(500);
  95. zkServer.shutdown();
  96. }
  97.  
  98. }
Add Comment
Please, Sign In to add comment