Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package kafkaboot;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import java.util.Arrays;
- import java.util.Date;
- import java.util.Properties;
- //import java.lang.Runtime;
- //import java.io.PrintWriter;
- public class KafkaClient {
- private final String topic;
- private final Properties props;
- public KafkaClient(String brokers, String topic, String username, String password) {
- this.topic = topic;
- String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
- String jaasCfg = String.format(jaasTemplate, username, password);
- String serializer = StringSerializer.class.getName();
- String deserializer = StringDeserializer.class.getName();
- //running openssl stuff within java...
- //Runtime r = Runtime.getRuntime();
- /*
- try {
- PrintWriter caWriter = new PrintWriter("d:/worklife/lab/cloudkafka/ca.pem", "UTF-8");
- String ca = "d:/worklife/lab/cloudkafka/ca.txt";
- caWriter.println(ca);
- caWriter.close();
- PrintWriter certWriter = new PrintWriter("d:/worklife/lab/cloudkafka/cert.pem", "UTF-8");
- String cert = "d:/worklife/lab/cloudkafka/cert.txt";
- certWriter.println(cert);
- certWriter.close();
- PrintWriter keyWriter = new PrintWriter("d:/worklife/lab/cloudkafka/key.pem", "UTF-8");
- String privateKey = "d:/worklife/lab/cloudkafka/key.txt";
- keyWriter.println(privateKey);
- keyWriter.close();
- Process p = r.exec("d:/worklife/lab/openssl/openssl pkcs12 -export -password pass:test1234 -out /tmp/store.pkcs12 -inkey d:/worklife/lab/cloudkafka/key.pem -certfile /tmp/ca.pem -in d:/worklife/lab/cloudkafka/cert.pem -caname 'CA Root' -name client");
- p.waitFor();
- p = r.exec("c:/program files/java/jdk1.8.0_131/keytool -importkeystore -noprompt -srckeystore /tmp/store.pkcs12 -destkeystore /tmp/keystore.jks -srcstoretype pkcs12 -srcstorepass test1234 -srckeypass test1234 -destkeypass test1234 -deststorepass test1234 -alias client");
- p.waitFor();
- p = r.exec("c:/program files/java/jdk1.8.0_131/keytool -noprompt -keystore /tmp/truststore.jks -alias CARoot -import -file d:/worklife/lab/cloudkafka/ca.pem -storepass test1234");
- p.waitFor();
- } catch (Exception ex) {
- System.out.println(ex.getMessage());
- }
- */
- props = new Properties();
- props.put("bootstrap.servers", brokers);
- props.put("group.id", "newer");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("auto.offset.reset", "earliest");
- props.put("session.timeout.ms", "10000");
- props.put("key.deserializer", deserializer);
- props.put("value.deserializer", deserializer);
- props.put("key.serializer", serializer);
- props.put("value.serializer", serializer);
- //props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("security.protocol", "SASL_SSL");
- //props.put("security.protocol", "SSL");
- //props.put("ssl.truststore.location", "D:/worklife/lab/cloudkakfa/truststore.jks");
- //props.put("ssl.truststore.password", "test1234");
- //props.put("ssl.keystore.location", "D:/worklife/lab/cloudkakfa/keystore.jks");
- //props.put("ssl.keystore.password", "test1234");
- //props.put("ssl.keypassword", "test1234");
- props.put("sasl.mechanism", "SCRAM-SHA-256");
- props.put("sasl.jaas.config", jaasCfg);
- }
- public String consume() {
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- System.out.println(props.toString());
- consumer.subscribe(Arrays.asList(topic));
- System.out.println(topic);
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records)
- System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
- consumer.close();
- return records.toString();
- }
- public String produce() {
- try {
- Producer<String, String> producer = new KafkaProducer<>(props);
- Date d = new Date();
- producer.send(new ProducerRecord<>(topic, "record", d.toString()));
- producer.close();
- } catch (Exception ex) {
- System.out.println("Exception:" + ex.getMessage() + "\n" + ex.getStackTrace());
- return ex.getMessage();
- }
- return "success";
- /*
- Thread one = new Thread() {
- public void run() {
- try {
- Producer<String, String> producer = new KafkaProducer<>(props);
- System.out.println(props.toString());
- int i = 0;
- while(true) {
- Date d = new Date();
- producer.send(new ProducerRecord<>(topic, Integer.toString(i), d.toString()));
- //producer.
- Thread.sleep(1000);
- i++;
- }
- //producer.close();
- } catch (InterruptedException v) {
- System.out.println(v);
- }
- }
- };
- one.start();
- */
- }
- }
Add Comment
Please, Sign In to add comment