Advertisement
Guest User

Untitled

a guest
Mar 9th, 2019
159
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.35 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "time"
  6.  
  7. "github.com/Shopify/sarama"
  8. "github.com/mufti1/kafka-example/producer"
  9. "github.com/sirupsen/logrus"
  10. )
  11.  
  12. func main() {
  13. // Setup Logging
  14. customFormatter := new(logrus.TextFormatter)
  15. customFormatter.TimestampFormat = "2006-01-02 15:04:05"
  16. customFormatter.FullTimestamp = true
  17. logrus.SetFormatter(customFormatter)
  18.  
  19. kafkaConfig := getKafkaConfig("", "")
  20. producers, err := sarama.NewSyncProducer([]string{"kafka:9092"}, kafkaConfig)
  21. if err != nil {
  22. logrus.Errorf("Unable to create kafka producer got error %v", err)
  23. return
  24. }
  25. defer func() {
  26. if err := producers.Close(); err != nil {
  27. logrus.Errorf("Unable to stop kafka producer: %v", err)
  28. return
  29. }
  30. }()
  31.  
  32. logrus.Infof("Success create kafka sync-producer")
  33.  
  34. kafka := &producer.KafkaProducer{
  35. Producer: producers,
  36. }
  37.  
  38. for i := 1; i <= 10; i++ {
  39. msg := fmt.Sprintf("message number %v", i)
  40. err := kafka.SendMessage("test_topic", msg)
  41. if err != nil {
  42. panic(err)
  43. }
  44. }
  45. }
  46.  
  47. func getKafkaConfig(username, password string) *sarama.Config {
  48. kafkaConfig := sarama.NewConfig()
  49. kafkaConfig.Producer.Return.Successes = true
  50. kafkaConfig.Net.WriteTimeout = 5 * time.Second
  51. kafkaConfig.Producer.Retry.Max = 0
  52.  
  53. if username != "" {
  54. kafkaConfig.Net.SASL.Enable = true
  55. kafkaConfig.Net.SASL.User = username
  56. kafkaConfig.Net.SASL.Password = password
  57. }
  58. return kafkaConfig
  59. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement