Advertisement
Guest User

Untitled

a guest
Jun 17th, 2019
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.07 KB | None | 0 0
  1. package kafka
  2.  
  3. import (
  4. "fmt"
  5. "log"
  6. "time"
  7.  
  8. "github.com/Shopify/sarama"
  9. )
  10.  
  11. // Init func to init kafka producer and consumer
  12. func Init() {
  13. config := sarama.NewConfig()
  14. config.Producer.Return.Successes = true
  15. config.Producer.RequiredAcks = sarama.WaitForAll
  16. config.Producer.Partitioner = sarama.NewRandomPartitioner
  17.  
  18. producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
  19.  
  20. if err != nil {
  21. fmt.Printf("%s \n", err)
  22. log.Fatal(err)
  23. }
  24.  
  25. consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
  26.  
  27. if err != nil {
  28. log.Fatal(err)
  29. }
  30.  
  31. partitionList, err := consumer.Partitions("CAR_NUMBER")
  32.  
  33. if err != nil {
  34. log.Fatal(err)
  35. }
  36.  
  37. for partition := range partitionList {
  38. partitionConsumer, err := consumer.ConsumePartition("CAR_NUMBER", int32(partition), sarama.OffsetNewest)
  39. if err != nil {
  40. log.Fatal(err)
  41. }
  42.  
  43. go func(sarama.PartitionConsumer) {
  44. for msg := range partitionConsumer.Messages() {
  45. db.Create(&src.CarSecondKillRecord{Message: string(msg.Value), Offset: int(msg.Offset), Time: time.Now().Local()})
  46. }
  47. }(partitionConsumer)
  48. }
  49.  
  50. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement