Guest User

Untitled

a guest
Jan 28th, 2018
137
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.76 KB | None | 0 0
  1. package kafkaboot;
  2.  
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.clients.producer.KafkaProducer;
  7. import org.apache.kafka.clients.producer.Producer;
  8. import org.apache.kafka.clients.producer.ProducerRecord;
  9. import org.apache.kafka.common.serialization.StringSerializer;
  10. import org.apache.kafka.common.serialization.StringDeserializer;
  11.  
  12. import java.util.Arrays;
  13. import java.util.Date;
  14. import java.util.Properties;
  15. //import java.lang.Runtime;
  16. //import java.io.PrintWriter;
  17.  
  18. public class KafkaClient {
  19. private final String topic;
  20. private final Properties props;
  21.  
  22. public KafkaClient(String brokers, String topic, String username, String password) {
  23. this.topic = topic;
  24. String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
  25. String jaasCfg = String.format(jaasTemplate, username, password);
  26.  
  27. String serializer = StringSerializer.class.getName();
  28. String deserializer = StringDeserializer.class.getName();
  29.  
  30. //running openssl stuff within java...
  31. //Runtime r = Runtime.getRuntime();
  32. /*
  33. try {
  34.  
  35. PrintWriter caWriter = new PrintWriter("d:/worklife/lab/cloudkafka/ca.pem", "UTF-8");
  36. String ca = "d:/worklife/lab/cloudkafka/ca.txt";
  37. caWriter.println(ca);
  38. caWriter.close();
  39.  
  40. PrintWriter certWriter = new PrintWriter("d:/worklife/lab/cloudkafka/cert.pem", "UTF-8");
  41. String cert = "d:/worklife/lab/cloudkafka/cert.txt";
  42. certWriter.println(cert);
  43. certWriter.close();
  44.  
  45. PrintWriter keyWriter = new PrintWriter("d:/worklife/lab/cloudkafka/key.pem", "UTF-8");
  46. String privateKey = "d:/worklife/lab/cloudkafka/key.txt";
  47. keyWriter.println(privateKey);
  48. keyWriter.close();
  49.  
  50. Process p = r.exec("d:/worklife/lab/openssl/openssl pkcs12 -export -password pass:test1234 -out /tmp/store.pkcs12 -inkey d:/worklife/lab/cloudkafka/key.pem -certfile /tmp/ca.pem -in d:/worklife/lab/cloudkafka/cert.pem -caname 'CA Root' -name client");
  51. p.waitFor();
  52.  
  53. p = r.exec("c:/program files/java/jdk1.8.0_131/keytool -importkeystore -noprompt -srckeystore /tmp/store.pkcs12 -destkeystore /tmp/keystore.jks -srcstoretype pkcs12 -srcstorepass test1234 -srckeypass test1234 -destkeypass test1234 -deststorepass test1234 -alias client");
  54. p.waitFor();
  55.  
  56. p = r.exec("c:/program files/java/jdk1.8.0_131/keytool -noprompt -keystore /tmp/truststore.jks -alias CARoot -import -file d:/worklife/lab/cloudkafka/ca.pem -storepass test1234");
  57. p.waitFor();
  58. } catch (Exception ex) {
  59. System.out.println(ex.getMessage());
  60. }
  61. */
  62.  
  63. props = new Properties();
  64. props.put("bootstrap.servers", brokers);
  65. props.put("group.id", "newer");
  66. props.put("enable.auto.commit", "true");
  67. props.put("auto.commit.interval.ms", "1000");
  68. props.put("auto.offset.reset", "earliest");
  69. props.put("session.timeout.ms", "10000");
  70. props.put("key.deserializer", deserializer);
  71. props.put("value.deserializer", deserializer);
  72. props.put("key.serializer", serializer);
  73. props.put("value.serializer", serializer);
  74. //props.put("security.protocol", "SASL_PLAINTEXT");
  75. props.put("security.protocol", "SASL_SSL");
  76. //props.put("security.protocol", "SSL");
  77. //props.put("ssl.truststore.location", "D:/worklife/lab/cloudkakfa/truststore.jks");
  78. //props.put("ssl.truststore.password", "test1234");
  79. //props.put("ssl.keystore.location", "D:/worklife/lab/cloudkakfa/keystore.jks");
  80. //props.put("ssl.keystore.password", "test1234");
  81. //props.put("ssl.keypassword", "test1234");
  82. props.put("sasl.mechanism", "SCRAM-SHA-256");
  83. props.put("sasl.jaas.config", jaasCfg);
  84. }
  85.  
  86. public String consume() {
  87. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  88. System.out.println(props.toString());
  89.  
  90. consumer.subscribe(Arrays.asList(topic));
  91. System.out.println(topic);
  92. ConsumerRecords<String, String> records = consumer.poll(100);
  93. for (ConsumerRecord<String, String> record : records)
  94. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
  95. consumer.close();
  96. return records.toString();
  97. }
  98.  
  99. public String produce() {
  100. try {
  101. Producer<String, String> producer = new KafkaProducer<>(props);
  102. Date d = new Date();
  103.  
  104. producer.send(new ProducerRecord<>(topic, "record", d.toString()));
  105. producer.close();
  106.  
  107. } catch (Exception ex) {
  108. System.out.println("Exception:" + ex.getMessage() + "\n" + ex.getStackTrace());
  109. return ex.getMessage();
  110. }
  111.  
  112. return "success";
  113.  
  114. /*
  115. Thread one = new Thread() {
  116. public void run() {
  117. try {
  118. Producer<String, String> producer = new KafkaProducer<>(props);
  119. System.out.println(props.toString());
  120. int i = 0;
  121. while(true) {
  122. Date d = new Date();
  123. producer.send(new ProducerRecord<>(topic, Integer.toString(i), d.toString()));
  124. //producer.
  125. Thread.sleep(1000);
  126. i++;
  127. }
  128. //producer.close();
  129. } catch (InterruptedException v) {
  130. System.out.println(v);
  131. }
  132. }
  133. };
  134. one.start();
  135. */
  136. }
  137.  
  138. }
Add Comment
Please, Sign In to add comment