Advertisement
Guest User

Untitled

a guest
Mar 13th, 2017
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.74 KB | None | 0 0
  1. // The Kafka documentation makes it very confusing to set up plain text SASL authentication while also using TLS / SSL.
  2. // MAKE SURE THE KEYSTORE YOU ARE USING ON THE KAFKA CLUSTER IS BUILT WITH RSA ALGO, OTHERWISE GO CAN'T TALK TO JAVA OVER TLS / SSL
  3. package main
  4.  
  5. import (
  6. "crypto/tls"
  7. "fmt"
  8. "github.com/Shopify/sarama"
  9. )
  10.  
  11. //KafkaConsumerConfig ... structure to read kafka configuration settings
  12. type KafkaConsumerConfig struct {
  13. Brokers []string
  14. Topic string
  15. consumer sarama.Consumer
  16. client sarama.Client
  17. }
  18.  
  19. func main() {
  20. config := KafkaConsumerConfig{}
  21. config.Brokers = []string{"YOUR_BROKER_URL"}
  22. config.Topic = "YOUR_TOPIC"
  23.  
  24. consumerConfig := sarama.NewConfig()
  25. consumerConfig.Net.SASL.User = "<username>"
  26. consumerConfig.Net.SASL.Password = "<password>"
  27. consumerConfig.Net.SASL.Handshake = true
  28. consumerConfig.Net.SASL.Enable = true
  29.  
  30. consumerConfig.Net.TLS.Enable = true
  31. tlsConfig := &tls.Config{
  32. InsecureSkipVerify: true,
  33. ClientAuth: 0,
  34. }
  35. consumerConfig.Net.TLS.Config = tlsConfig
  36.  
  37. var err error
  38. config.client, err = sarama.NewClient(config.Brokers, consumerConfig)
  39. if err != nil {
  40. fmt.Println("Unable to create kafka client " + err.Error())
  41. return
  42. }
  43.  
  44. config.consumer, err = sarama.NewConsumerFromClient(config.client)
  45. if err != nil {
  46. fmt.Println("Unable to create new kafka consumer", err, config.client)
  47. return
  48. }
  49.  
  50. partitions, err := config.client.Partitions(config.Topic)
  51.  
  52. if err != nil {
  53. fmt.Println("Unable to fetch partition IDs for the topic", err, config.client, config.Topic)
  54. return
  55. }
  56.  
  57. fmt.Println("Partitions:", partitions)
  58.  
  59. topics, err := config.client.Topics()
  60. if err != nil {
  61. fmt.Println("Unable to fetch topics", err, config.client)
  62. return
  63. }
  64.  
  65. fmt.Println("Topics:", topics)
  66.  
  67. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement