package metrics import ( "fmt" "sort" "strings" "time" log "github.com/hotstar/persona-commons/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/push" "go.uber.org/zap" ) type PrometheusPushGatewayClient struct { pushGateWayClient *push.Pusher isInitialised bool separator string counterMap map[string]prometheus.Counter gaugeMap map[string]prometheus.Gauge histogramMap map[string]prometheus.Histogram } func NewPrometheusPushGatewayClient(endpoint, jobName string) *PrometheusPushGatewayClient { pushGateWayClient := push.New(endpoint, jobName) ticker := time.NewTicker(10 * time.Second) go func() { for { select { case <-ticker.C: if err := pushGateWayClient.Add(); err != nil { log.Error("Could not push to push gateway ", zap.Error(err)) } } } }() return &PrometheusPushGatewayClient{ pushGateWayClient: pushGateWayClient, isInitialised: true, separator: "_", counterMap: make(map[string]prometheus.Counter), gaugeMap: make(map[string]prometheus.Gauge), histogramMap: make(map[string]prometheus.Histogram), } } func (c *PrometheusPushGatewayClient) IsInitialised() bool { return c.isInitialised } func (c *PrometheusPushGatewayClient) Increment(key string) { var incrementCollector prometheus.Counter FQName := c.formatKeys(key) log.Warn("FQName: ", zap.Any("FQName: ", FQName)) if collector, ok := c.counterMap[FQName]; ok { incrementCollector = collector log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String())) } else { incrementCollector = prometheus.NewCounter(prometheus.CounterOpts{ Name: FQName, }) c.counterMap[FQName] = incrementCollector c.pushGateWayClient.Collector(incrementCollector) } incrementCollector.Inc() //if err := c.pushGateWayClient.Add(); err != nil { // log.Error("Could not push to push gateway ", zap.Error(err)) // if err, ok := err.(prometheus.AlreadyRegisteredError); ok { // oldCollector := err.ExistingCollector.(prometheus.Counter) // log.Error(oldCollector.Desc().String()) // } //} } func (c *PrometheusPushGatewayClient) IncrementWithTags(key string, tags []string) { var incrementCollector prometheus.Counter tagMap := c.formatTags(tags) formattedKeys := c.formatKeys(key) FQName := formattedKeys + generateLabels(tagMap) log.Warn("FQName: ", zap.Any("FQName: ", FQName)) if collector, ok := c.counterMap[FQName]; ok { incrementCollector = collector log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String())) } else { incrementCollector = prometheus.NewCounter(prometheus.CounterOpts{ Name: formattedKeys, ConstLabels: tagMap, }) c.counterMap[FQName] = incrementCollector c.pushGateWayClient.Collector(incrementCollector) } incrementCollector.Inc() //if err := c.pushGateWayClient.Add(); err != nil { // log.Error("Could not push to push gateway ", zap.Error(err)) // if err, ok := err.(prometheus.AlreadyRegisteredError); ok { // oldCollector := err.ExistingCollector.(prometheus.Counter) // log.Error(oldCollector.Desc().String()) // } //} } func (c *PrometheusPushGatewayClient) Timing(key string, since time.Time) { var timeCollector prometheus.Histogram FQName := c.formatKeys(key) duration := float64(time.Since(since)) buckets := prometheus.ExponentialBuckets(float64(exponentialDurationBucketStart), exponentialBucketFactor, exponentialBucketCount) log.Warn("FQName: ", zap.Any("FQName: ", FQName)) if collector, ok := c.histogramMap[FQName]; ok { timeCollector = collector log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String())) } else { timeCollector = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: FQName, Buckets: buckets, }) c.histogramMap[FQName] = timeCollector c.pushGateWayClient.Collector(timeCollector) } timeCollector.Observe(duration) //if err := c.pushGateWayClient.Add(); err != nil { // log.Error("Could not push to push gateway ", zap.Error(err)) // if err, ok := err.(prometheus.AlreadyRegisteredError); ok { // oldCollector := err.ExistingCollector.(prometheus.Histogram) // log.Error(oldCollector.Desc().String()) // } //} } func (c *PrometheusPushGatewayClient) TimingWithTags(key string, since time.Time, tags []string) { var timeCollector prometheus.Histogram tagMap := c.formatTags(tags) formattedKeys := c.formatKeys(key) FQName := formattedKeys + generateLabels(tagMap) duration := float64(time.Since(since)) buckets := prometheus.ExponentialBuckets(float64(exponentialDurationBucketStart), exponentialBucketFactor, exponentialBucketCount) log.Warn("FQName: ", zap.Any("FQName: ", FQName)) if collector, ok := c.histogramMap[FQName]; ok { timeCollector = collector log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String())) } else { timeCollector = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: formattedKeys, Buckets: buckets, ConstLabels: tagMap, }) c.histogramMap[FQName] = timeCollector c.pushGateWayClient.Collector(timeCollector) } timeCollector.Observe(duration) //if err := c.pushGateWayClient.Add(); err != nil { // log.Error("Could not push to push gateway ", zap.Error(err)) // if err, ok := err.(prometheus.AlreadyRegisteredError); ok { // oldCollector := err.ExistingCollector.(prometheus.Histogram) // log.Error(oldCollector.Desc().String()) // } //} } func (c *PrometheusPushGatewayClient) Gauge(key string, value float64) { var gaugeCollector prometheus.Gauge FQName := c.formatKeys(key) log.Warn("FQName: ", zap.Any("FQName: ", FQName)) if collector, ok := c.gaugeMap[FQName]; ok { gaugeCollector = collector log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String())) } else { gaugeCollector = prometheus.NewGauge(prometheus.GaugeOpts{ Name: FQName, }) c.gaugeMap[FQName] = gaugeCollector c.pushGateWayClient.Collector(gaugeCollector) } gaugeCollector.Set(value) //if err := c.pushGateWayClient.Add(); err != nil { // log.Error("Could not push to push gateway ", zap.Error(err)) // if err, ok := err.(prometheus.AlreadyRegisteredError); ok { // oldCollector := err.ExistingCollector.(prometheus.Gauge) // log.Error(oldCollector.Desc().String()) // } //} } func (c *PrometheusPushGatewayClient) GaugeWithTags(key string, value float64, tags []string) { var gaugeCollector prometheus.Gauge tagMap := c.formatTags(tags) formattedKeys := c.formatKeys(key) FQName := formattedKeys + generateLabels(tagMap) log.Warn("FQName: ", zap.Any("FQName: ", FQName)) if collector, ok := c.gaugeMap[FQName]; ok { gaugeCollector = collector log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String())) } else { gaugeCollector = prometheus.NewGauge(prometheus.GaugeOpts{ Name: formattedKeys, ConstLabels: tagMap, }) c.gaugeMap[FQName] = gaugeCollector c.pushGateWayClient.Collector(gaugeCollector) } gaugeCollector.Set(value) //if err := c.pushGateWayClient.Add(); err != nil { // log.Error("Could not push to push gateway ", zap.Error(err)) // if err, ok := err.(prometheus.AlreadyRegisteredError); ok { // oldCollector := err.ExistingCollector.(prometheus.Gauge) // log.Error(oldCollector.Desc().String()) // } //} } func (c *PrometheusPushGatewayClient) Count(key string, value int64) { var countCollector prometheus.Counter FQName := c.formatKeys(key) log.Warn("FQName: ", zap.Any("FQName: ", FQName)) if collector, ok := c.counterMap[FQName]; ok { countCollector = collector log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String())) } else { countCollector = prometheus.NewCounter(prometheus.CounterOpts{ Name: FQName, }) c.counterMap[FQName] = countCollector c.pushGateWayClient.Collector(countCollector) } countCollector.Add(float64(value)) //if err := c.pushGateWayClient.Add(); err != nil { // log.Error("Could not push to push gateway ", zap.Error(err)) // if err, ok := err.(prometheus.AlreadyRegisteredError); ok { // oldCollector := err.ExistingCollector.(prometheus.Counter) // log.Error(oldCollector.Desc().String()) // } //} } func (c *PrometheusPushGatewayClient) CountWithTags(key string, value int64, tags []string) { var countCollector prometheus.Counter tagMap := c.formatTags(tags) formattedKeys := c.formatKeys(key) FQName := formattedKeys + generateLabels(tagMap) log.Warn("FQName: ", zap.Any("FQName: ", FQName)) if collector, ok := c.counterMap[FQName]; ok { countCollector = collector log.Warn("Found old collector", zap.Any("Collector: ", collector.Desc().String())) } else { countCollector = prometheus.NewCounter(prometheus.CounterOpts{ Name: formattedKeys, ConstLabels: tagMap, }) c.counterMap[FQName] = countCollector c.pushGateWayClient.Collector(countCollector) } countCollector.Add(float64(value)) //if err := c.pushGateWayClient.Add(); err != nil { // log.Error("Could not push to push gateway ", zap.Error(err)) // if err, ok := err.(prometheus.AlreadyRegisteredError); ok { // oldCollector := err.ExistingCollector.(prometheus.Counter) // log.Error(oldCollector.Desc().String()) // } //} } // ShutDownClient No shutdown function present. func (c *PrometheusPushGatewayClient) ShutDownClient() { } func (c *PrometheusPushGatewayClient) formatKeys(key string) string { s := strings.Replace(key, ".", c.separator, -1) return strings.Replace(s, "-", c.separator, -1) } func (c *PrometheusPushGatewayClient) formatTags(tags []string) map[string]string { tagMap := make(map[string]string) for _, v := range tags { tagSplit := strings.Split(v, ":") tagKey := c.formatKeys(tagSplit[0]) tagValue := tagSplit[1] if tagValue == "" { tagValue = "nil" } tagMap[tagKey] = tagValue } return tagMap } type LabelPair struct { name string value string } func generateLabels(tagMap map[string]string) string { if len(tagMap) == 0 { return "" } labelPairSlice := make([]LabelPair, 0) for tagKey, tagValue := range tagMap { labelPairSlice = append(labelPairSlice, LabelPair{name: tagKey, value: tagValue}) } sort.Slice(labelPairSlice, func(i, j int) bool { return labelPairSlice[i].name < labelPairSlice[j].name }) var sb strings.Builder sb.WriteRune('{') for _, label := range labelPairSlice { sb.WriteString(fmt.Sprintf(`%v="%v"`, label.name, label.value)) sb.WriteRune(',') } str := sb.String() str = str[:len(str)-1] str += "}" return str }