Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package kafka
- import (
- "fmt"
- "log"
- "time"
- "github.com/Shopify/sarama"
- )
- // Init func to init kafka producer and consumer
- func Init() {
- config := sarama.NewConfig()
- config.Producer.Return.Successes = true
- config.Producer.RequiredAcks = sarama.WaitForAll
- config.Producer.Partitioner = sarama.NewRandomPartitioner
- producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
- if err != nil {
- fmt.Printf("%s \n", err)
- log.Fatal(err)
- }
- consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
- if err != nil {
- log.Fatal(err)
- }
- partitionList, err := consumer.Partitions("CAR_NUMBER")
- if err != nil {
- log.Fatal(err)
- }
- for partition := range partitionList {
- partitionConsumer, err := consumer.ConsumePartition("CAR_NUMBER", int32(partition), sarama.OffsetNewest)
- if err != nil {
- log.Fatal(err)
- }
- go func(sarama.PartitionConsumer) {
- for msg := range partitionConsumer.Messages() {
- db.Create(&src.CarSecondKillRecord{Message: string(msg.Value), Offset: int(msg.Offset), Time: time.Now().Local()})
- }
- }(partitionConsumer)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement