Guest User

Untitled

a guest
Jan 10th, 2023
212
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 10.65 KB | Source Code | 0 0
  1. package metrics
  2.  
  3. import (
  4.     "fmt"
  5.     "sort"
  6.     "strings"
  7.     "time"
  8.  
  9.     log "github.com/hotstar/persona-commons/logging"
  10.     "github.com/prometheus/client_golang/prometheus"
  11.     "github.com/prometheus/client_golang/prometheus/push"
  12.     "go.uber.org/zap"
  13. )
  14.  
  15. type PrometheusPushGatewayClient struct {
  16.     pushGateWayClient *push.Pusher
  17.     isInitialised     bool
  18.     separator         string
  19.     counterMap        map[string]prometheus.Counter
  20.     gaugeMap          map[string]prometheus.Gauge
  21.     histogramMap      map[string]prometheus.Histogram
  22. }
  23.  
  24. func NewPrometheusPushGatewayClient(endpoint, jobName string) *PrometheusPushGatewayClient {
  25.     pushGateWayClient := push.New(endpoint, jobName)
  26.  
  27.     ticker := time.NewTicker(10 * time.Second)
  28.     go func() {
  29.         for {
  30.             select {
  31.             case <-ticker.C:
  32.                 if err := pushGateWayClient.Add(); err != nil {
  33.                     log.Error("Could not push to push gateway ", zap.Error(err))
  34.                 }
  35.             }
  36.         }
  37.     }()
  38.  
  39.     return &PrometheusPushGatewayClient{
  40.         pushGateWayClient: pushGateWayClient,
  41.         isInitialised:     true,
  42.         separator:         "_",
  43.         counterMap:        make(map[string]prometheus.Counter),
  44.         gaugeMap:          make(map[string]prometheus.Gauge),
  45.         histogramMap:      make(map[string]prometheus.Histogram),
  46.     }
  47. }
  48.  
  49. func (c *PrometheusPushGatewayClient) IsInitialised() bool {
  50.     return c.isInitialised
  51. }
  52.  
  53. func (c *PrometheusPushGatewayClient) Increment(key string) {
  54.     var incrementCollector prometheus.Counter
  55.     FQName := c.formatKeys(key)
  56.     log.Warn("FQName: ", zap.Any("FQName: ", FQName))
  57.     if collector, ok := c.counterMap[FQName]; ok {
  58.         incrementCollector = collector
  59.         log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
  60.     } else {
  61.         incrementCollector = prometheus.NewCounter(prometheus.CounterOpts{
  62.             Name: FQName,
  63.         })
  64.         c.counterMap[FQName] = incrementCollector
  65.         c.pushGateWayClient.Collector(incrementCollector)
  66.     }
  67.     incrementCollector.Inc()
  68.     //if err := c.pushGateWayClient.Add(); err != nil {
  69.     //  log.Error("Could not push to push gateway ", zap.Error(err))
  70.     //  if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
  71.     //      oldCollector := err.ExistingCollector.(prometheus.Counter)
  72.     //      log.Error(oldCollector.Desc().String())
  73.     //  }
  74.     //}
  75. }
  76.  
  77. func (c *PrometheusPushGatewayClient) IncrementWithTags(key string, tags []string) {
  78.     var incrementCollector prometheus.Counter
  79.     tagMap := c.formatTags(tags)
  80.     formattedKeys := c.formatKeys(key)
  81.     FQName := formattedKeys + generateLabels(tagMap)
  82.     log.Warn("FQName: ", zap.Any("FQName: ", FQName))
  83.     if collector, ok := c.counterMap[FQName]; ok {
  84.         incrementCollector = collector
  85.         log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
  86.     } else {
  87.         incrementCollector = prometheus.NewCounter(prometheus.CounterOpts{
  88.             Name:        formattedKeys,
  89.             ConstLabels: tagMap,
  90.         })
  91.         c.counterMap[FQName] = incrementCollector
  92.         c.pushGateWayClient.Collector(incrementCollector)
  93.     }
  94.     incrementCollector.Inc()
  95.     //if err := c.pushGateWayClient.Add(); err != nil {
  96.     //  log.Error("Could not push to push gateway ", zap.Error(err))
  97.     //  if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
  98.     //      oldCollector := err.ExistingCollector.(prometheus.Counter)
  99.     //      log.Error(oldCollector.Desc().String())
  100.     //  }
  101.     //}
  102. }
  103.  
  104. func (c *PrometheusPushGatewayClient) Timing(key string, since time.Time) {
  105.     var timeCollector prometheus.Histogram
  106.     FQName := c.formatKeys(key)
  107.     duration := float64(time.Since(since))
  108.     buckets := prometheus.ExponentialBuckets(float64(exponentialDurationBucketStart), exponentialBucketFactor, exponentialBucketCount)
  109.     log.Warn("FQName: ", zap.Any("FQName: ", FQName))
  110.     if collector, ok := c.histogramMap[FQName]; ok {
  111.         timeCollector = collector
  112.         log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
  113.     } else {
  114.         timeCollector = prometheus.NewHistogram(prometheus.HistogramOpts{
  115.             Name:    FQName,
  116.             Buckets: buckets,
  117.         })
  118.         c.histogramMap[FQName] = timeCollector
  119.         c.pushGateWayClient.Collector(timeCollector)
  120.     }
  121.     timeCollector.Observe(duration)
  122.     //if err := c.pushGateWayClient.Add(); err != nil {
  123.     //  log.Error("Could not push to push gateway ", zap.Error(err))
  124.     //  if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
  125.     //      oldCollector := err.ExistingCollector.(prometheus.Histogram)
  126.     //      log.Error(oldCollector.Desc().String())
  127.     //  }
  128.     //}
  129. }
  130.  
  131. func (c *PrometheusPushGatewayClient) TimingWithTags(key string, since time.Time, tags []string) {
  132.     var timeCollector prometheus.Histogram
  133.     tagMap := c.formatTags(tags)
  134.     formattedKeys := c.formatKeys(key)
  135.     FQName := formattedKeys + generateLabels(tagMap)
  136.     duration := float64(time.Since(since))
  137.     buckets := prometheus.ExponentialBuckets(float64(exponentialDurationBucketStart), exponentialBucketFactor, exponentialBucketCount)
  138.     log.Warn("FQName: ", zap.Any("FQName: ", FQName))
  139.     if collector, ok := c.histogramMap[FQName]; ok {
  140.         timeCollector = collector
  141.         log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
  142.     } else {
  143.         timeCollector = prometheus.NewHistogram(prometheus.HistogramOpts{
  144.             Name:        formattedKeys,
  145.             Buckets:     buckets,
  146.             ConstLabels: tagMap,
  147.         })
  148.         c.histogramMap[FQName] = timeCollector
  149.         c.pushGateWayClient.Collector(timeCollector)
  150.     }
  151.     timeCollector.Observe(duration)
  152.     //if err := c.pushGateWayClient.Add(); err != nil {
  153.     //  log.Error("Could not push to push gateway ", zap.Error(err))
  154.     //  if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
  155.     //      oldCollector := err.ExistingCollector.(prometheus.Histogram)
  156.     //      log.Error(oldCollector.Desc().String())
  157.     //  }
  158.     //}
  159. }
  160.  
  161. func (c *PrometheusPushGatewayClient) Gauge(key string, value float64) {
  162.     var gaugeCollector prometheus.Gauge
  163.     FQName := c.formatKeys(key)
  164.     log.Warn("FQName: ", zap.Any("FQName: ", FQName))
  165.     if collector, ok := c.gaugeMap[FQName]; ok {
  166.         gaugeCollector = collector
  167.         log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
  168.     } else {
  169.         gaugeCollector = prometheus.NewGauge(prometheus.GaugeOpts{
  170.             Name: FQName,
  171.         })
  172.         c.gaugeMap[FQName] = gaugeCollector
  173.         c.pushGateWayClient.Collector(gaugeCollector)
  174.     }
  175.     gaugeCollector.Set(value)
  176.     //if err := c.pushGateWayClient.Add(); err != nil {
  177.     //  log.Error("Could not push to push gateway ", zap.Error(err))
  178.     //  if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
  179.     //      oldCollector := err.ExistingCollector.(prometheus.Gauge)
  180.     //      log.Error(oldCollector.Desc().String())
  181.     //  }
  182.     //}
  183. }
  184.  
  185. func (c *PrometheusPushGatewayClient) GaugeWithTags(key string, value float64, tags []string) {
  186.     var gaugeCollector prometheus.Gauge
  187.     tagMap := c.formatTags(tags)
  188.     formattedKeys := c.formatKeys(key)
  189.     FQName := formattedKeys + generateLabels(tagMap)
  190.     log.Warn("FQName: ", zap.Any("FQName: ", FQName))
  191.     if collector, ok := c.gaugeMap[FQName]; ok {
  192.         gaugeCollector = collector
  193.         log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
  194.     } else {
  195.         gaugeCollector = prometheus.NewGauge(prometheus.GaugeOpts{
  196.             Name:        formattedKeys,
  197.             ConstLabels: tagMap,
  198.         })
  199.         c.gaugeMap[FQName] = gaugeCollector
  200.         c.pushGateWayClient.Collector(gaugeCollector)
  201.     }
  202.     gaugeCollector.Set(value)
  203.     //if err := c.pushGateWayClient.Add(); err != nil {
  204.     //  log.Error("Could not push to push gateway ", zap.Error(err))
  205.     //  if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
  206.     //      oldCollector := err.ExistingCollector.(prometheus.Gauge)
  207.     //      log.Error(oldCollector.Desc().String())
  208.     //  }
  209.     //}
  210. }
  211.  
  212. func (c *PrometheusPushGatewayClient) Count(key string, value int64) {
  213.     var countCollector prometheus.Counter
  214.     FQName := c.formatKeys(key)
  215.     log.Warn("FQName: ", zap.Any("FQName: ", FQName))
  216.     if collector, ok := c.counterMap[FQName]; ok {
  217.         countCollector = collector
  218.         log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
  219.     } else {
  220.         countCollector = prometheus.NewCounter(prometheus.CounterOpts{
  221.             Name: FQName,
  222.         })
  223.         c.counterMap[FQName] = countCollector
  224.         c.pushGateWayClient.Collector(countCollector)
  225.     }
  226.     countCollector.Add(float64(value))
  227.     //if err := c.pushGateWayClient.Add(); err != nil {
  228.     //  log.Error("Could not push to push gateway ", zap.Error(err))
  229.     //  if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
  230.     //      oldCollector := err.ExistingCollector.(prometheus.Counter)
  231.     //      log.Error(oldCollector.Desc().String())
  232.     //  }
  233.     //}
  234. }
  235.  
  236. func (c *PrometheusPushGatewayClient) CountWithTags(key string, value int64, tags []string) {
  237.     var countCollector prometheus.Counter
  238.     tagMap := c.formatTags(tags)
  239.     formattedKeys := c.formatKeys(key)
  240.     FQName := formattedKeys + generateLabels(tagMap)
  241.     log.Warn("FQName: ", zap.Any("FQName: ", FQName))
  242.     if collector, ok := c.counterMap[FQName]; ok {
  243.         countCollector = collector
  244.         log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
  245.     } else {
  246.         countCollector = prometheus.NewCounter(prometheus.CounterOpts{
  247.             Name:        formattedKeys,
  248.             ConstLabels: tagMap,
  249.         })
  250.         c.counterMap[FQName] = countCollector
  251.         c.pushGateWayClient.Collector(countCollector)
  252.     }
  253.     countCollector.Add(float64(value))
  254.     //if err := c.pushGateWayClient.Add(); err != nil {
  255.     //  log.Error("Could not push to push gateway ", zap.Error(err))
  256.     //  if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
  257.     //      oldCollector := err.ExistingCollector.(prometheus.Counter)
  258.     //      log.Error(oldCollector.Desc().String())
  259.     //  }
  260.     //}
  261. }
  262.  
  263. // ShutDownClient No shutdown function present.
  264. func (c *PrometheusPushGatewayClient) ShutDownClient() {
  265. }
  266.  
  267. func (c *PrometheusPushGatewayClient) formatKeys(key string) string {
  268.     s := strings.Replace(key, ".", c.separator, -1)
  269.     return strings.Replace(s, "-", c.separator, -1)
  270. }
  271.  
  272. func (c *PrometheusPushGatewayClient) formatTags(tags []string) map[string]string {
  273.     tagMap := make(map[string]string)
  274.     for _, v := range tags {
  275.         tagSplit := strings.Split(v, ":")
  276.         tagKey := c.formatKeys(tagSplit[0])
  277.         tagValue := tagSplit[1]
  278.         if tagValue == "" {
  279.             tagValue = "nil"
  280.         }
  281.         tagMap[tagKey] = tagValue
  282.     }
  283.     return tagMap
  284. }
  285.  
  286. type LabelPair struct {
  287.     name  string
  288.     value string
  289. }
  290.  
  291. func generateLabels(tagMap map[string]string) string {
  292.     if len(tagMap) == 0 {
  293.         return ""
  294.     }
  295.     labelPairSlice := make([]LabelPair, 0)
  296.     for tagKey, tagValue := range tagMap {
  297.         labelPairSlice = append(labelPairSlice, LabelPair{name: tagKey, value: tagValue})
  298.     }
  299.     sort.Slice(labelPairSlice, func(i, j int) bool {
  300.         return labelPairSlice[i].name < labelPairSlice[j].name
  301.     })
  302.     var sb strings.Builder
  303.     sb.WriteRune('{')
  304.     for _, label := range labelPairSlice {
  305.         sb.WriteString(fmt.Sprintf(`%v="%v"`, label.name, label.value))
  306.         sb.WriteRune(',')
  307.     }
  308.     str := sb.String()
  309.     str = str[:len(str)-1]
  310.     str += "}"
  311.     return str
  312. }
  313.  
Advertisement
Add Comment
Please, Sign In to add comment