Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "github.com/hpcloud/tail"
- "github.com/montanaflynn/stats"
- "github.com/golang/glog"
- "fmt"
- "strings"
- "bytes"
- "bufio"
- "time"
- "strconv"
- "github.com/influxdata/influxdb/client/v2"
- "flag"
- "os"
- "os/signal"
- "syscall"
- )
- type queue_map struct {
- date string
- data *bytes.Buffer
- }
- const (
- TYPE_RTB = "rtb"
- layout = "2006-01-02T15:04:05-07:00"
- influx_db = ""
- influx_username = ""
- influx_password = ""
- influx_retention_policy = ""
- )
- type Record struct {
- Date time.Time
- ResponseTime float64
- ResponseCode int
- URI string
- Type string
- SocketRRT string
- }
- type Report struct {
- URI string
- Time time.Time
- Type string
- Percentile float64
- Value float64
- Count float64
- }
- var (
- logs = make(chan queue_map, 0)
- seek int64
- Host string
- Log_file = flag.String("log-file", "/var/log/nginx/upstream.log", "Input log file")
- Measurement = flag.String("measurement", "delivery_response_time", "InfluxDB Measurement")
- Seek_info = flag.String("seek-file", "/tmp/delivery_log_seek.info", "File contain seek info")
- Log_type = flag.String("type", "delivery", "Log type { delivery, rtb }")
- )
- func FloatToString(input_num float64) string {
- return strconv.FormatFloat(input_num, 'f', 0, 64)
- }
- func write_last_pos() {
- fo, err := os.Create(*Seek_info)
- if err != nil {
- fmt.Println(err)
- return
- }
- defer fo.Close()
- _, err = fmt.Fprint(fo, seek)
- if err != nil {
- fmt.Println(err)
- }
- }
- func wait() {
- defer func() {
- if err := recover(); err != nil {
- fmt.Println(err)
- }
- write_last_pos()
- if err := c.Close(); err != nil {
- fmt.Println(err)
- }
- os.Exit(1)
- }()
- ch := make(chan os.Signal)
- signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- glog.Info(<-ch, " signal catched, starting close services")
- panic("SIGNAL Catch")
- }
- func do_report(report []Report) {
- //c, err := client.NewHTTPClient(client.HTTPConfig{
- // Addr: "http://monitor-adex.rtty.in:8086",
- // Username: influx_username,
- // Password: influx_password,
- // InsecureSkipVerify: true,
- // Timeout: time.Second,
- //})
- //if err != nil {
- //fmt.Println("Error: ", err)
- //}
- bp, err := client.NewBatchPoints(client.BatchPointsConfig{
- Database: influx_db,
- Precision: "s",
- RetentionPolicy: influx_retention_policy,
- })
- if err != nil {
- fmt.Println("Error: ", err)
- }
- Host, err = os.Hostname()
- if err != nil {
- fmt.Println("hostname error", err)
- }
- for _, v := range report {
- tags := map[string]string {
- "type": v.Type,
- "uri": v.URI,
- "percentile": FloatToString(v.Percentile),
- "host": Host,
- }
- fields := map[string]interface{}{
- "response_time": v.Value,
- "count": v.Count,
- }
- pt, err := client.NewPoint(*Measurement, tags, fields, v.Time)
- if err != nil {
- fmt.Println("Error: ", err)
- }
- bp.AddPoint(pt)
- }
- err = c.Write(bp)
- if err != nil {
- fmt.Println(err)
- }
- // c.Close()
- }
- func read_delivery() {
- for {
- if (logs != nil) {
- var records []Record
- var report []Report
- input := <-logs
- scanner := bufio.NewScanner(input.data)
- var tm time.Time
- for scanner.Scan() {
- data_array := strings.Split(scanner.Text(), ";")
- if len(data_array) < 6 {
- continue
- }
- if records == nil {
- records = make([]Record, 0, 100)
- }
- date, err := time.Parse(layout, data_array[0])
- if err != nil {
- //fmt.Printf("Parse error delivery: %v\n", err)
- fmt.Printf("Parse error delivery (%+v)(%+v):\t%+v\n", scanner.Text(), data_array[0], err)
- continue
- }
- tm = date
- duration_s, err := strconv.ParseFloat(data_array[1], 0)
- if err != nil {
- fmt.Printf("Parse float error: %v\n", err)
- continue
- }
- duration := duration_s * 1000
- code, err := strconv.Atoi(data_array[2])
- if err != nil {
- fmt.Printf("Convert error: %v\n", err)
- continue
- }
- uri := data_array[3]
- var recordType string
- switch data_array[4] {
- case "1":
- recordType = "rtb"
- default:
- recordType = "delivery"
- }
- socketRRT := data_array[5]
- records = append(records, Record{
- Date: date,
- ResponseTime: duration,
- ResponseCode: code,
- URI: uri,
- Type: recordType,
- SocketRRT: socketRRT,
- })
- }
- if len(records) == 0 {
- continue
- }
- groupByURI := make(map[string][]Record)
- var recordsDelivery []Record
- for _, record := range records {
- recordsDelivery = append(recordsDelivery, record)
- values := groupByURI[record.URI]
- if values == nil {
- values = make([]Record, 0, 10)
- }
- values = append(values, record)
- groupByURI[record.URI] = values
- }
- for k, v := range groupByURI {
- var rtb_count float64
- var delivery_count float64
- var rt []float64
- var rt_rtb []float64
- delivery_count = 0
- rtb_count = 0
- for _, a := range v {
- if a.Type == TYPE_RTB {
- rt_rtb = append(rt_rtb, a.ResponseTime)
- rtb_count++
- continue
- }
- rt = append(rt, a.ResponseTime)
- delivery_count++
- }
- for _, percentile_k := range []float64{50,90,95,96,97,98,99} {
- rt_val, _ := stats.Percentile(rt, percentile_k)
- rt_rtb_val, _ := stats.Percentile(rt_rtb, percentile_k)
- if len(rt) > 0 {
- report = append(report, Report{
- k,
- tm,
- "delivery",
- percentile_k,
- rt_val,
- delivery_count,
- })
- }
- if len(rt_rtb) > 0 {
- report = append(report, Report{
- k,
- tm,
- "rtb",
- percentile_k,
- rt_rtb_val,
- rtb_count,
- })
- }
- }
- }
- do_report(report)
- time.Tick(time.Millisecond * 10)
- }
- }
- }
- func read_other() {
- recordType := "other"
- for {
- if (logs != nil) {
- var records []Record
- var report []Report
- input := <-logs
- scanner := bufio.NewScanner(input.data)
- var tm time.Time
- for scanner.Scan() {
- data_array := strings.Split(scanner.Text(), ";")
- if len(data_array) < 6 {
- continue
- }
- if records == nil {
- records = make([]Record, 0, 100)
- }
- date, err := time.Parse(layout, data_array[0])
- if err != nil {
- fmt.Printf("Parse error: %v\n", err)
- continue
- }
- tm = date
- duration_s, err := strconv.ParseFloat(data_array[1], 0)
- if err != nil {
- fmt.Printf("Parse float error: %v\n", err)
- continue
- }
- duration := duration_s * 1000
- code, err := strconv.Atoi(data_array[2])
- if err != nil {
- fmt.Printf("Convert error: %v\n", err)
- continue
- }
- uri := data_array[3]
- socketRRT := data_array[5]
- records = append(records, Record{
- Date: date,
- ResponseTime: duration,
- ResponseCode: code,
- URI: uri,
- Type: recordType,
- SocketRRT: socketRRT,
- })
- }
- if len(records) == 0 {
- continue
- }
- groupByURI := make(map[string][]Record)
- var recordsOther []Record
- for _, record := range records {
- recordsOther = append(recordsOther, record)
- values := groupByURI[record.URI]
- if values == nil {
- values = make([]Record, 0, 10)
- }
- values = append(values, record)
- groupByURI[record.URI] = values
- }
- for k, v := range groupByURI {
- var rt []float64
- var count float64
- for _, a := range v {
- rt = append(rt, a.ResponseTime)
- count++
- }
- for _, percentile_k := range []float64{50,90,95,96,97,98,99} {
- rt_val, _ := stats.Percentile(rt, percentile_k)
- if len(rt) > 0 {
- report = append(report, Report{
- k,
- tm,
- recordType,
- percentile_k,
- rt_val,
- count,
- })
- }
- }
- }
- do_report(report)
- }
- }
- }
- var c client.Client
- func main() {
- flag.Parse()
- var err error
- c, err = client.NewHTTPClient(client.HTTPConfig{
- Addr: "http://monitor-adex.rtty.in:8086",
- Username: influx_username,
- Password: influx_password,
- InsecureSkipVerify: true,
- Timeout: time.Second,
- })
- if err != nil {
- fmt.Println("Error: ", err)
- return
- }
- var (
- date string
- cur_date string
- data *bytes.Buffer
- )
- defer func() {
- if err := recover(); err != nil {
- fmt.Println(err)
- }
- write_last_pos()
- if err := c.Close(); err != nil {
- fmt.Println(err)
- }
- os.Exit(1)
- }()
- go wait()
- if ( *Log_type == "delivery" ) {
- go read_delivery()
- } else {
- go read_other()
- }
- fi, err := os.Open(*Seek_info)
- if err != nil {
- write_last_pos()
- fi, err = os.Open(*Seek_info)
- if err != nil {
- fmt.Println(err)
- return
- }
- }
- scanner := bufio.NewScanner(fi)
- var read_pos string
- for scanner.Scan() {
- read_pos = scanner.Text()
- }
- seek_pos, _ := strconv.ParseInt(read_pos, 0, 64)
- fi.Close()
- config := tail.Config{
- Poll: true,
- Follow: true,
- ReOpen: true,
- Location: &tail.SeekInfo{seek_pos, 0},
- }
- t, err := tail.TailFile(*Log_file, config)
- if err != nil {
- fmt.Printf("trouble while tail file %v\n", err)
- return
- }
- n := 1
- for line := range t.Lines {
- if len(line.Text) == 0 {
- continue
- }
- cur_date = strings.Split(line.Text, ";")[0]
- _, err := time.Parse(layout, cur_date)
- if err != nil {
- fmt.Printf("Parse error main: %v\n", err)
- continue
- }
- if (len(strings.Split(line.Text, ";")) < 6) {
- continue
- }
- if (cur_date != date) {
- logs <- queue_map{date, data}
- data.Reset()
- n++
- date = cur_date
- }
- data.WriteString(line.Text + "\n")
- seek, err = t.Tell()
- if err != nil {
- fmt.Printf("Error: %v\n", err)
- continue
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement