Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package metrics
- import (
- "fmt"
- "sort"
- "strings"
- "time"
- log "github.com/hotstar/persona-commons/logging"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/push"
- "go.uber.org/zap"
- )
- type PrometheusPushGatewayClient struct {
- pushGateWayClient *push.Pusher
- isInitialised bool
- separator string
- counterMap map[string]prometheus.Counter
- gaugeMap map[string]prometheus.Gauge
- histogramMap map[string]prometheus.Histogram
- }
- func NewPrometheusPushGatewayClient(endpoint, jobName string) *PrometheusPushGatewayClient {
- pushGateWayClient := push.New(endpoint, jobName)
- ticker := time.NewTicker(10 * time.Second)
- go func() {
- for {
- select {
- case <-ticker.C:
- if err := pushGateWayClient.Add(); err != nil {
- log.Error("Could not push to push gateway ", zap.Error(err))
- }
- }
- }
- }()
- return &PrometheusPushGatewayClient{
- pushGateWayClient: pushGateWayClient,
- isInitialised: true,
- separator: "_",
- counterMap: make(map[string]prometheus.Counter),
- gaugeMap: make(map[string]prometheus.Gauge),
- histogramMap: make(map[string]prometheus.Histogram),
- }
- }
- func (c *PrometheusPushGatewayClient) IsInitialised() bool {
- return c.isInitialised
- }
- func (c *PrometheusPushGatewayClient) Increment(key string) {
- var incrementCollector prometheus.Counter
- FQName := c.formatKeys(key)
- log.Warn("FQName: ", zap.Any("FQName: ", FQName))
- if collector, ok := c.counterMap[FQName]; ok {
- incrementCollector = collector
- log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
- } else {
- incrementCollector = prometheus.NewCounter(prometheus.CounterOpts{
- Name: FQName,
- })
- c.counterMap[FQName] = incrementCollector
- c.pushGateWayClient.Collector(incrementCollector)
- }
- incrementCollector.Inc()
- //if err := c.pushGateWayClient.Add(); err != nil {
- // log.Error("Could not push to push gateway ", zap.Error(err))
- // if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
- // oldCollector := err.ExistingCollector.(prometheus.Counter)
- // log.Error(oldCollector.Desc().String())
- // }
- //}
- }
- func (c *PrometheusPushGatewayClient) IncrementWithTags(key string, tags []string) {
- var incrementCollector prometheus.Counter
- tagMap := c.formatTags(tags)
- formattedKeys := c.formatKeys(key)
- FQName := formattedKeys + generateLabels(tagMap)
- log.Warn("FQName: ", zap.Any("FQName: ", FQName))
- if collector, ok := c.counterMap[FQName]; ok {
- incrementCollector = collector
- log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
- } else {
- incrementCollector = prometheus.NewCounter(prometheus.CounterOpts{
- Name: formattedKeys,
- ConstLabels: tagMap,
- })
- c.counterMap[FQName] = incrementCollector
- c.pushGateWayClient.Collector(incrementCollector)
- }
- incrementCollector.Inc()
- //if err := c.pushGateWayClient.Add(); err != nil {
- // log.Error("Could not push to push gateway ", zap.Error(err))
- // if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
- // oldCollector := err.ExistingCollector.(prometheus.Counter)
- // log.Error(oldCollector.Desc().String())
- // }
- //}
- }
- func (c *PrometheusPushGatewayClient) Timing(key string, since time.Time) {
- var timeCollector prometheus.Histogram
- FQName := c.formatKeys(key)
- duration := float64(time.Since(since))
- buckets := prometheus.ExponentialBuckets(float64(exponentialDurationBucketStart), exponentialBucketFactor, exponentialBucketCount)
- log.Warn("FQName: ", zap.Any("FQName: ", FQName))
- if collector, ok := c.histogramMap[FQName]; ok {
- timeCollector = collector
- log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
- } else {
- timeCollector = prometheus.NewHistogram(prometheus.HistogramOpts{
- Name: FQName,
- Buckets: buckets,
- })
- c.histogramMap[FQName] = timeCollector
- c.pushGateWayClient.Collector(timeCollector)
- }
- timeCollector.Observe(duration)
- //if err := c.pushGateWayClient.Add(); err != nil {
- // log.Error("Could not push to push gateway ", zap.Error(err))
- // if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
- // oldCollector := err.ExistingCollector.(prometheus.Histogram)
- // log.Error(oldCollector.Desc().String())
- // }
- //}
- }
- func (c *PrometheusPushGatewayClient) TimingWithTags(key string, since time.Time, tags []string) {
- var timeCollector prometheus.Histogram
- tagMap := c.formatTags(tags)
- formattedKeys := c.formatKeys(key)
- FQName := formattedKeys + generateLabels(tagMap)
- duration := float64(time.Since(since))
- buckets := prometheus.ExponentialBuckets(float64(exponentialDurationBucketStart), exponentialBucketFactor, exponentialBucketCount)
- log.Warn("FQName: ", zap.Any("FQName: ", FQName))
- if collector, ok := c.histogramMap[FQName]; ok {
- timeCollector = collector
- log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
- } else {
- timeCollector = prometheus.NewHistogram(prometheus.HistogramOpts{
- Name: formattedKeys,
- Buckets: buckets,
- ConstLabels: tagMap,
- })
- c.histogramMap[FQName] = timeCollector
- c.pushGateWayClient.Collector(timeCollector)
- }
- timeCollector.Observe(duration)
- //if err := c.pushGateWayClient.Add(); err != nil {
- // log.Error("Could not push to push gateway ", zap.Error(err))
- // if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
- // oldCollector := err.ExistingCollector.(prometheus.Histogram)
- // log.Error(oldCollector.Desc().String())
- // }
- //}
- }
- func (c *PrometheusPushGatewayClient) Gauge(key string, value float64) {
- var gaugeCollector prometheus.Gauge
- FQName := c.formatKeys(key)
- log.Warn("FQName: ", zap.Any("FQName: ", FQName))
- if collector, ok := c.gaugeMap[FQName]; ok {
- gaugeCollector = collector
- log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
- } else {
- gaugeCollector = prometheus.NewGauge(prometheus.GaugeOpts{
- Name: FQName,
- })
- c.gaugeMap[FQName] = gaugeCollector
- c.pushGateWayClient.Collector(gaugeCollector)
- }
- gaugeCollector.Set(value)
- //if err := c.pushGateWayClient.Add(); err != nil {
- // log.Error("Could not push to push gateway ", zap.Error(err))
- // if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
- // oldCollector := err.ExistingCollector.(prometheus.Gauge)
- // log.Error(oldCollector.Desc().String())
- // }
- //}
- }
- func (c *PrometheusPushGatewayClient) GaugeWithTags(key string, value float64, tags []string) {
- var gaugeCollector prometheus.Gauge
- tagMap := c.formatTags(tags)
- formattedKeys := c.formatKeys(key)
- FQName := formattedKeys + generateLabels(tagMap)
- log.Warn("FQName: ", zap.Any("FQName: ", FQName))
- if collector, ok := c.gaugeMap[FQName]; ok {
- gaugeCollector = collector
- log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
- } else {
- gaugeCollector = prometheus.NewGauge(prometheus.GaugeOpts{
- Name: formattedKeys,
- ConstLabels: tagMap,
- })
- c.gaugeMap[FQName] = gaugeCollector
- c.pushGateWayClient.Collector(gaugeCollector)
- }
- gaugeCollector.Set(value)
- //if err := c.pushGateWayClient.Add(); err != nil {
- // log.Error("Could not push to push gateway ", zap.Error(err))
- // if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
- // oldCollector := err.ExistingCollector.(prometheus.Gauge)
- // log.Error(oldCollector.Desc().String())
- // }
- //}
- }
- func (c *PrometheusPushGatewayClient) Count(key string, value int64) {
- var countCollector prometheus.Counter
- FQName := c.formatKeys(key)
- log.Warn("FQName: ", zap.Any("FQName: ", FQName))
- if collector, ok := c.counterMap[FQName]; ok {
- countCollector = collector
- log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
- } else {
- countCollector = prometheus.NewCounter(prometheus.CounterOpts{
- Name: FQName,
- })
- c.counterMap[FQName] = countCollector
- c.pushGateWayClient.Collector(countCollector)
- }
- countCollector.Add(float64(value))
- //if err := c.pushGateWayClient.Add(); err != nil {
- // log.Error("Could not push to push gateway ", zap.Error(err))
- // if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
- // oldCollector := err.ExistingCollector.(prometheus.Counter)
- // log.Error(oldCollector.Desc().String())
- // }
- //}
- }
- func (c *PrometheusPushGatewayClient) CountWithTags(key string, value int64, tags []string) {
- var countCollector prometheus.Counter
- tagMap := c.formatTags(tags)
- formattedKeys := c.formatKeys(key)
- FQName := formattedKeys + generateLabels(tagMap)
- log.Warn("FQName: ", zap.Any("FQName: ", FQName))
- if collector, ok := c.counterMap[FQName]; ok {
- countCollector = collector
- log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String()))
- } else {
- countCollector = prometheus.NewCounter(prometheus.CounterOpts{
- Name: formattedKeys,
- ConstLabels: tagMap,
- })
- c.counterMap[FQName] = countCollector
- c.pushGateWayClient.Collector(countCollector)
- }
- countCollector.Add(float64(value))
- //if err := c.pushGateWayClient.Add(); err != nil {
- // log.Error("Could not push to push gateway ", zap.Error(err))
- // if err, ok := err.(prometheus.AlreadyRegisteredError); ok {
- // oldCollector := err.ExistingCollector.(prometheus.Counter)
- // log.Error(oldCollector.Desc().String())
- // }
- //}
- }
- // ShutDownClient No shutdown function present.
- func (c *PrometheusPushGatewayClient) ShutDownClient() {
- }
- func (c *PrometheusPushGatewayClient) formatKeys(key string) string {
- s := strings.Replace(key, ".", c.separator, -1)
- return strings.Replace(s, "-", c.separator, -1)
- }
- func (c *PrometheusPushGatewayClient) formatTags(tags []string) map[string]string {
- tagMap := make(map[string]string)
- for _, v := range tags {
- tagSplit := strings.Split(v, ":")
- tagKey := c.formatKeys(tagSplit[0])
- tagValue := tagSplit[1]
- if tagValue == "" {
- tagValue = "nil"
- }
- tagMap[tagKey] = tagValue
- }
- return tagMap
- }
- type LabelPair struct {
- name string
- value string
- }
- func generateLabels(tagMap map[string]string) string {
- if len(tagMap) == 0 {
- return ""
- }
- labelPairSlice := make([]LabelPair, 0)
- for tagKey, tagValue := range tagMap {
- labelPairSlice = append(labelPairSlice, LabelPair{name: tagKey, value: tagValue})
- }
- sort.Slice(labelPairSlice, func(i, j int) bool {
- return labelPairSlice[i].name < labelPairSlice[j].name
- })
- var sb strings.Builder
- sb.WriteRune('{')
- for _, label := range labelPairSlice {
- sb.WriteString(fmt.Sprintf(`%v="%v"`, label.name, label.value))
- sb.WriteRune(',')
- }
- str := sb.String()
- str = str[:len(str)-1]
- str += "}"
- return str
- }
Advertisement
Add Comment
Please, Sign In to add comment