Advertisement
Guest User

Untitled

a guest
Nov 20th, 2019
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 4.24 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4.     "encoding/json"
  5.     "fmt"
  6.     "os"
  7.     "os/signal"
  8.     "syscall"
  9.  
  10.     "github.com/pkg/errors"
  11.     "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
  12. )
  13.  
  14. type kafkaMessage struct {
  15.     Key         json.Number `json:"key"`
  16.     Version     int64       `json:"version"`
  17.     Annotations string      `json:"annotations"`
  18.     Data        string      `json:"data"`
  19. }
  20. type kafkaAnnotations struct {
  21.     EntityType  string `json:"entity_type"`
  22.     CatalogSlug string `json:"catalog_slug"`
  23. }
  24.  
  25. var lookupTable = map[string]string{
  26.     "worten_pt:hera-entity-prd":   "hera-entity-pt",
  27.     "worten_es:hera-entity-prd":   "hera-entity-es",
  28.     "worten_pt:hera-product-prd":  "hera-product-pt",
  29.     "worten_es:hera-product-prd":  "hera-product-es",
  30.     "worten_pt:hera-category-prd": "hera-category-pt",
  31.     "worten_es:hera-category-prd": "hera-category-es",
  32.     "worten_pt:hera-list-prd":     "hera-list-pt",
  33.     "worten_es:hera-list-prd":     "hera-list-es",
  34. }
  35.  
  36. func unmarshal(m *kafka.Message) (kafkaMessage, kafkaAnnotations, error) {
  37.  
  38.     var msg kafkaMessage
  39.     var annotations kafkaAnnotations
  40.     if err := json.Unmarshal(m.Value, &msg); err != nil {
  41.         return msg, annotations, errors.Wrap(err, fmt.Sprintf("failed to unmarshal kafka message %s", m.Value))
  42.     }
  43.     if err := json.Unmarshal([]byte(msg.Annotations), &annotations); err != nil {
  44.         return msg, annotations, errors.Wrap(err, fmt.Sprintf("failed to unmarshal annotations message %s", m.Value))
  45.     }
  46.  
  47.     return msg, annotations, nil
  48. }
  49.  
  50. func createProducer(servers string) *kafka.Producer {
  51.     p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": servers})
  52.  
  53.     if err != nil {
  54.         panic("Failed to create producer")
  55.     }
  56.     return p
  57. }
  58.  
  59. func main() {
  60.     if len(os.Args) < 4 {
  61.         fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
  62.             os.Args[0])
  63.         os.Exit(1)
  64.     }
  65.  
  66.     broker := os.Args[1]
  67.     group := os.Args[2]
  68.     topics := os.Args[3:]
  69.     sigchan := make(chan os.Signal, 1)
  70.     signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
  71.  
  72.     p := createProducer(broker)
  73.  
  74.     c, err := kafka.NewConsumer(&kafka.ConfigMap{
  75.         "bootstrap.servers": broker,
  76.         // Avoid connecting to IPv6 brokers:
  77.         // This is needed for the ErrAllBrokersDown show-case below
  78.         // when using localhost brokers on OSX, since the OSX resolver
  79.         // will return the IPv6 addresses first.
  80.         // You typically don't need to specify this configuration property.
  81.         "broker.address.family": "v4",
  82.         "group.id":              group,
  83.         "session.timeout.ms":    6000,
  84.         "auto.offset.reset":     "earliest"})
  85.  
  86.     if err != nil {
  87.         fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
  88.         os.Exit(1)
  89.     }
  90.  
  91.     fmt.Printf("Created Consumer %v\n", c)
  92.  
  93.     err = c.SubscribeTopics(topics, nil)
  94.  
  95.     run := true
  96.     count := 0
  97.     for run == true {
  98.         select {
  99.         case sig := <-sigchan:
  100.             fmt.Printf("Caught signal %v: terminating\n", sig)
  101.             run = false
  102.         default:
  103.             ev := c.Poll(100)
  104.             if ev == nil {
  105.                 continue
  106.             }
  107.  
  108.             switch e := ev.(type) {
  109.             case *kafka.Message:
  110.                 m, a, err := unmarshal(e)
  111.                 if err != nil {
  112.                     fmt.Println(err)
  113.                 }
  114.                 _ = m
  115.                 lookupKey := fmt.Sprintf("%s:%s", a.CatalogSlug, *e.TopicPartition.Topic)
  116.                 destinationTopic, ok := lookupTable[lookupKey]
  117.  
  118.                 if !ok {
  119.                     fmt.Println("No existing lookup key", lookupKey)
  120.                     continue
  121.                 }
  122.  
  123.                 // fmt.Println("destination", destinationTopic)
  124.                 count++
  125.                 if count%10000 == 0 {
  126.                     fmt.Println(count)
  127.                 }
  128.                 p.Produce(&kafka.Message{
  129.                     Key:            e.Key,
  130.                     TopicPartition: kafka.TopicPartition{Topic: &destinationTopic, Partition: kafka.PartitionAny},
  131.                     Value:          e.Value,
  132.                 }, nil)
  133.  
  134.                 // fmt.Printf("%% Message on %s:\n%s\n",
  135.                 //  e.TopicPartition, string(e.Value))
  136.                 // if e.Headers != nil {
  137.                 //  fmt.Printf("%% Headers: %v\n", e.Headers)
  138.                 // }
  139.             case kafka.Error:
  140.                 // Errors should generally be considered
  141.                 // informational, the client will try to
  142.                 // automatically recover.
  143.                 // But in this example we choose to terminate
  144.                 // the application if all brokers are down.
  145.                 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
  146.                 if e.Code() == kafka.ErrAllBrokersDown {
  147.                     run = false
  148.                 }
  149.             default:
  150.                 fmt.Printf("Ignored %v\n", e)
  151.             }
  152.         }
  153.     }
  154.  
  155.     fmt.Printf("Closing consumer\n")
  156.     c.Close()
  157. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement