Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "encoding/json"
- "fmt"
- "os"
- "os/signal"
- "syscall"
- "github.com/pkg/errors"
- "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
- )
- type kafkaMessage struct {
- Key json.Number `json:"key"`
- Version int64 `json:"version"`
- Annotations string `json:"annotations"`
- Data string `json:"data"`
- }
- type kafkaAnnotations struct {
- EntityType string `json:"entity_type"`
- CatalogSlug string `json:"catalog_slug"`
- }
- var lookupTable = map[string]string{
- "worten_pt:hera-entity-prd": "hera-entity-pt",
- "worten_es:hera-entity-prd": "hera-entity-es",
- "worten_pt:hera-product-prd": "hera-product-pt",
- "worten_es:hera-product-prd": "hera-product-es",
- "worten_pt:hera-category-prd": "hera-category-pt",
- "worten_es:hera-category-prd": "hera-category-es",
- "worten_pt:hera-list-prd": "hera-list-pt",
- "worten_es:hera-list-prd": "hera-list-es",
- }
- func unmarshal(m *kafka.Message) (kafkaMessage, kafkaAnnotations, error) {
- var msg kafkaMessage
- var annotations kafkaAnnotations
- if err := json.Unmarshal(m.Value, &msg); err != nil {
- return msg, annotations, errors.Wrap(err, fmt.Sprintf("failed to unmarshal kafka message %s", m.Value))
- }
- if err := json.Unmarshal([]byte(msg.Annotations), &annotations); err != nil {
- return msg, annotations, errors.Wrap(err, fmt.Sprintf("failed to unmarshal annotations message %s", m.Value))
- }
- return msg, annotations, nil
- }
- func createProducer(servers string) *kafka.Producer {
- p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": servers})
- if err != nil {
- panic("Failed to create producer")
- }
- return p
- }
- func main() {
- if len(os.Args) < 4 {
- fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
- os.Args[0])
- os.Exit(1)
- }
- broker := os.Args[1]
- group := os.Args[2]
- topics := os.Args[3:]
- sigchan := make(chan os.Signal, 1)
- signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
- p := createProducer(broker)
- c, err := kafka.NewConsumer(&kafka.ConfigMap{
- "bootstrap.servers": broker,
- // Avoid connecting to IPv6 brokers:
- // This is needed for the ErrAllBrokersDown show-case below
- // when using localhost brokers on OSX, since the OSX resolver
- // will return the IPv6 addresses first.
- // You typically don't need to specify this configuration property.
- "broker.address.family": "v4",
- "group.id": group,
- "session.timeout.ms": 6000,
- "auto.offset.reset": "earliest"})
- if err != nil {
- fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
- os.Exit(1)
- }
- fmt.Printf("Created Consumer %v\n", c)
- err = c.SubscribeTopics(topics, nil)
- run := true
- count := 0
- for run == true {
- select {
- case sig := <-sigchan:
- fmt.Printf("Caught signal %v: terminating\n", sig)
- run = false
- default:
- ev := c.Poll(100)
- if ev == nil {
- continue
- }
- switch e := ev.(type) {
- case *kafka.Message:
- m, a, err := unmarshal(e)
- if err != nil {
- fmt.Println(err)
- }
- _ = m
- lookupKey := fmt.Sprintf("%s:%s", a.CatalogSlug, *e.TopicPartition.Topic)
- destinationTopic, ok := lookupTable[lookupKey]
- if !ok {
- fmt.Println("No existing lookup key", lookupKey)
- continue
- }
- // fmt.Println("destination", destinationTopic)
- count++
- if count%10000 == 0 {
- fmt.Println(count)
- }
- p.Produce(&kafka.Message{
- Key: e.Key,
- TopicPartition: kafka.TopicPartition{Topic: &destinationTopic, Partition: kafka.PartitionAny},
- Value: e.Value,
- }, nil)
- // fmt.Printf("%% Message on %s:\n%s\n",
- // e.TopicPartition, string(e.Value))
- // if e.Headers != nil {
- // fmt.Printf("%% Headers: %v\n", e.Headers)
- // }
- case kafka.Error:
- // Errors should generally be considered
- // informational, the client will try to
- // automatically recover.
- // But in this example we choose to terminate
- // the application if all brokers are down.
- fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
- if e.Code() == kafka.ErrAllBrokersDown {
- run = false
- }
- default:
- fmt.Printf("Ignored %v\n", e)
- }
- }
- }
- fmt.Printf("Closing consumer\n")
- c.Close()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement