Advertisement
Guest User

Untitled

a guest
Oct 26th, 2016
1,026
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.16 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "github.com/Shopify/sarama"
  6. "github.com/bsm/sarama-cluster"
  7. "log"
  8. "os"
  9. "os/signal"
  10. "strings"
  11. "syscall"
  12. )
  13.  
  14. var (
  15. logger = log.New(os.Stdout, "", 0)
  16. )
  17.  
  18. func trigger1() {
  19. brokerList := "x.x.x.x:9092"
  20. groupID := "test-cons-1"
  21. topicList := "test"
  22. config := cluster.NewConfig()
  23.  
  24. config.Consumer.Return.Errors = true
  25. config.Group.Return.Notifications = true
  26. config.Consumer.Offsets.Initial = sarama.OffsetOldest // Does not work unless u use a different grp id
  27.  
  28. consumer, err := cluster.NewConsumer(strings.Split(brokerList, ","), groupID, strings.Split(topicList, ","), config)
  29. if err != nil {
  30. logger.Println("Failed to start consumer: %s", err)
  31. }
  32.  
  33. go func() {
  34. for err := range consumer.Errors() {
  35. logger.Printf("Error: %s\n", err.Error())
  36. }
  37. }()
  38.  
  39. go func() {
  40. for note := range consumer.Notifications() {
  41. logger.Printf("Rebalanced: %+v\n", note)
  42. }
  43. }()
  44.  
  45. go func() {
  46. for msg := range consumer.Messages() {
  47. fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
  48. consumer.MarkOffset(msg, "")
  49. }
  50. }()
  51.  
  52. wait := make(chan os.Signal, 1)
  53. signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
  54. <-wait
  55. if err := consumer.Close(); err != nil {
  56. logger.Println("Failed to close consumer: ", err)
  57. }
  58. }
  59.  
  60. func trigger2() {
  61. config := sarama.NewConfig()
  62. config.Consumer.Fetch.Max = 1
  63.  
  64. cons, err := sarama.NewConsumer([]string{"x.x.x.x:9092"}, config)
  65. if err != nil {
  66. logger.Println("Error creating consumer ", err)
  67. }
  68. defer func() {
  69. if err := cons.Close(); err != nil {
  70. logger.Fatalln(err)
  71. }
  72. }()
  73.  
  74. pc, err := cons.ConsumePartition("test", 0, sarama.OffsetOldest)
  75. if err != nil {
  76. logger.Println(err)
  77. }
  78. defer func() {
  79. if err := pc.Close(); err != nil {
  80. logger.Fatalln(err)
  81. }
  82. }()
  83.  
  84. signals := make(chan os.Signal, 1)
  85. signal.Notify(signals, os.Interrupt)
  86. consumed := 0
  87. ConsumerLoop:
  88. for {
  89. select {
  90. case msg := <-pc.Messages():
  91. {
  92. logger.Printf("Message : %s\nOffset : %d, Highwatermark: %d", string(msg.Value), msg.Offset, pc.HighWaterMarkOffset())
  93. consumed++
  94. }
  95. case <-signals:
  96. {
  97. break ConsumerLoop
  98. }
  99. }
  100. }
  101. <-signals
  102. }
  103.  
  104. func main() {
  105. trigger2()
  106. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement