Advertisement
Guest User

11

a guest
Sep 13th, 2016
138
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.77 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "github.com/hpcloud/tail"
  5. "github.com/montanaflynn/stats"
  6. "github.com/golang/glog"
  7. "fmt"
  8. "strings"
  9. "bytes"
  10. "bufio"
  11. "time"
  12. "strconv"
  13. "github.com/influxdata/influxdb/client/v2"
  14. "flag"
  15. "os"
  16. "os/signal"
  17. "syscall"
  18. )
  19.  
  20. type queue_map struct {
  21. date string
  22. data *bytes.Buffer
  23. }
  24.  
  25. const (
  26. TYPE_RTB = "rtb"
  27. layout = "2006-01-02T15:04:05-07:00"
  28. influx_db = ""
  29. influx_username = ""
  30. influx_password = ""
  31. influx_retention_policy = ""
  32. )
  33.  
  34. type Record struct {
  35. Date time.Time
  36. ResponseTime float64
  37. ResponseCode int
  38. URI string
  39. Type string
  40. SocketRRT string
  41. }
  42.  
  43. type Report struct {
  44. URI string
  45. Time time.Time
  46. Type string
  47. Percentile float64
  48. Value float64
  49. Count float64
  50. }
  51.  
  52. var (
  53. logs = make(chan queue_map, 0)
  54. seek int64
  55. Host string
  56. Log_file = flag.String("log-file", "/var/log/nginx/upstream.log", "Input log file")
  57. Measurement = flag.String("measurement", "delivery_response_time", "InfluxDB Measurement")
  58. Seek_info = flag.String("seek-file", "/tmp/delivery_log_seek.info", "File contain seek info")
  59. Log_type = flag.String("type", "delivery", "Log type { delivery, rtb }")
  60. )
  61.  
  62.  
  63. func FloatToString(input_num float64) string {
  64. return strconv.FormatFloat(input_num, 'f', 0, 64)
  65. }
  66.  
  67. func write_last_pos() {
  68. fo, err := os.Create(*Seek_info)
  69. if err != nil {
  70. fmt.Println(err)
  71. return
  72. }
  73. defer fo.Close()
  74. _, err = fmt.Fprint(fo, seek)
  75. if err != nil {
  76. fmt.Println(err)
  77. }
  78. }
  79.  
  80. func wait() {
  81. defer func() {
  82. if err := recover(); err != nil {
  83. fmt.Println(err)
  84. }
  85. write_last_pos()
  86. if err := c.Close(); err != nil {
  87. fmt.Println(err)
  88. }
  89. os.Exit(1)
  90. }()
  91.  
  92. ch := make(chan os.Signal)
  93. signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  94.  
  95. glog.Info(<-ch, " signal catched, starting close services")
  96.  
  97. panic("SIGNAL Catch")
  98. }
  99.  
  100.  
  101. func do_report(report []Report) {
  102. //c, err := client.NewHTTPClient(client.HTTPConfig{
  103. // Addr: "http://monitor-adex.rtty.in:8086",
  104. // Username: influx_username,
  105. // Password: influx_password,
  106. // InsecureSkipVerify: true,
  107. // Timeout: time.Second,
  108. //})
  109. //if err != nil {
  110. //fmt.Println("Error: ", err)
  111. //}
  112. bp, err := client.NewBatchPoints(client.BatchPointsConfig{
  113. Database: influx_db,
  114. Precision: "s",
  115. RetentionPolicy: influx_retention_policy,
  116. })
  117. if err != nil {
  118. fmt.Println("Error: ", err)
  119. }
  120. Host, err = os.Hostname()
  121. if err != nil {
  122. fmt.Println("hostname error", err)
  123. }
  124. for _, v := range report {
  125. tags := map[string]string {
  126. "type": v.Type,
  127. "uri": v.URI,
  128. "percentile": FloatToString(v.Percentile),
  129. "host": Host,
  130. }
  131. fields := map[string]interface{}{
  132. "response_time": v.Value,
  133. "count": v.Count,
  134. }
  135. pt, err := client.NewPoint(*Measurement, tags, fields, v.Time)
  136. if err != nil {
  137. fmt.Println("Error: ", err)
  138. }
  139. bp.AddPoint(pt)
  140. }
  141. err = c.Write(bp)
  142. if err != nil {
  143. fmt.Println(err)
  144. }
  145. // c.Close()
  146. }
  147.  
  148. func read_delivery() {
  149. for {
  150.  
  151. if (logs != nil) {
  152. var records []Record
  153. var report []Report
  154. input := <-logs
  155. scanner := bufio.NewScanner(input.data)
  156. var tm time.Time
  157. for scanner.Scan() {
  158. data_array := strings.Split(scanner.Text(), ";")
  159. if len(data_array) < 6 {
  160. continue
  161. }
  162. if records == nil {
  163. records = make([]Record, 0, 100)
  164. }
  165. date, err := time.Parse(layout, data_array[0])
  166. if err != nil {
  167. //fmt.Printf("Parse error delivery: %v\n", err)
  168. fmt.Printf("Parse error delivery (%+v)(%+v):\t%+v\n", scanner.Text(), data_array[0], err)
  169. continue
  170. }
  171. tm = date
  172. duration_s, err := strconv.ParseFloat(data_array[1], 0)
  173. if err != nil {
  174. fmt.Printf("Parse float error: %v\n", err)
  175. continue
  176. }
  177. duration := duration_s * 1000
  178. code, err := strconv.Atoi(data_array[2])
  179. if err != nil {
  180. fmt.Printf("Convert error: %v\n", err)
  181. continue
  182. }
  183. uri := data_array[3]
  184. var recordType string
  185. switch data_array[4] {
  186. case "1":
  187. recordType = "rtb"
  188. default:
  189. recordType = "delivery"
  190. }
  191.  
  192. socketRRT := data_array[5]
  193.  
  194. records = append(records, Record{
  195. Date: date,
  196. ResponseTime: duration,
  197. ResponseCode: code,
  198. URI: uri,
  199. Type: recordType,
  200. SocketRRT: socketRRT,
  201. })
  202.  
  203. }
  204.  
  205. if len(records) == 0 {
  206. continue
  207. }
  208. groupByURI := make(map[string][]Record)
  209. var recordsDelivery []Record
  210. for _, record := range records {
  211. recordsDelivery = append(recordsDelivery, record)
  212. values := groupByURI[record.URI]
  213. if values == nil {
  214. values = make([]Record, 0, 10)
  215. }
  216. values = append(values, record)
  217. groupByURI[record.URI] = values
  218. }
  219. for k, v := range groupByURI {
  220. var rtb_count float64
  221. var delivery_count float64
  222. var rt []float64
  223. var rt_rtb []float64
  224. delivery_count = 0
  225. rtb_count = 0
  226. for _, a := range v {
  227. if a.Type == TYPE_RTB {
  228. rt_rtb = append(rt_rtb, a.ResponseTime)
  229. rtb_count++
  230. continue
  231. }
  232. rt = append(rt, a.ResponseTime)
  233. delivery_count++
  234. }
  235.  
  236. for _, percentile_k := range []float64{50,90,95,96,97,98,99} {
  237. rt_val, _ := stats.Percentile(rt, percentile_k)
  238. rt_rtb_val, _ := stats.Percentile(rt_rtb, percentile_k)
  239. if len(rt) > 0 {
  240. report = append(report, Report{
  241. k,
  242. tm,
  243. "delivery",
  244. percentile_k,
  245. rt_val,
  246. delivery_count,
  247. })
  248. }
  249. if len(rt_rtb) > 0 {
  250. report = append(report, Report{
  251. k,
  252. tm,
  253. "rtb",
  254. percentile_k,
  255. rt_rtb_val,
  256. rtb_count,
  257. })
  258. }
  259. }
  260. }
  261. do_report(report)
  262. time.Tick(time.Millisecond * 10)
  263. }
  264. }
  265. }
  266.  
  267. func read_other() {
  268. recordType := "other"
  269. for {
  270.  
  271. if (logs != nil) {
  272. var records []Record
  273. var report []Report
  274. input := <-logs
  275. scanner := bufio.NewScanner(input.data)
  276. var tm time.Time
  277. for scanner.Scan() {
  278. data_array := strings.Split(scanner.Text(), ";")
  279. if len(data_array) < 6 {
  280. continue
  281. }
  282. if records == nil {
  283. records = make([]Record, 0, 100)
  284. }
  285. date, err := time.Parse(layout, data_array[0])
  286. if err != nil {
  287. fmt.Printf("Parse error: %v\n", err)
  288. continue
  289. }
  290. tm = date
  291. duration_s, err := strconv.ParseFloat(data_array[1], 0)
  292. if err != nil {
  293. fmt.Printf("Parse float error: %v\n", err)
  294. continue
  295. }
  296. duration := duration_s * 1000
  297. code, err := strconv.Atoi(data_array[2])
  298. if err != nil {
  299. fmt.Printf("Convert error: %v\n", err)
  300. continue
  301. }
  302. uri := data_array[3]
  303.  
  304. socketRRT := data_array[5]
  305.  
  306. records = append(records, Record{
  307. Date: date,
  308. ResponseTime: duration,
  309. ResponseCode: code,
  310. URI: uri,
  311. Type: recordType,
  312. SocketRRT: socketRRT,
  313. })
  314.  
  315. }
  316.  
  317. if len(records) == 0 {
  318. continue
  319. }
  320. groupByURI := make(map[string][]Record)
  321. var recordsOther []Record
  322. for _, record := range records {
  323. recordsOther = append(recordsOther, record)
  324. values := groupByURI[record.URI]
  325. if values == nil {
  326. values = make([]Record, 0, 10)
  327. }
  328. values = append(values, record)
  329. groupByURI[record.URI] = values
  330. }
  331. for k, v := range groupByURI {
  332. var rt []float64
  333. var count float64
  334.  
  335. for _, a := range v {
  336. rt = append(rt, a.ResponseTime)
  337. count++
  338. }
  339.  
  340. for _, percentile_k := range []float64{50,90,95,96,97,98,99} {
  341. rt_val, _ := stats.Percentile(rt, percentile_k)
  342. if len(rt) > 0 {
  343. report = append(report, Report{
  344. k,
  345. tm,
  346. recordType,
  347. percentile_k,
  348. rt_val,
  349. count,
  350. })
  351. }
  352. }
  353. }
  354. do_report(report)
  355. }
  356. }
  357. }
  358.  
  359. var c client.Client
  360.  
  361. func main() {
  362. flag.Parse()
  363.  
  364. var err error
  365. c, err = client.NewHTTPClient(client.HTTPConfig{
  366. Addr: "http://monitor-adex.rtty.in:8086",
  367. Username: influx_username,
  368. Password: influx_password,
  369. InsecureSkipVerify: true,
  370. Timeout: time.Second,
  371. })
  372. if err != nil {
  373. fmt.Println("Error: ", err)
  374. return
  375. }
  376.  
  377. var (
  378. date string
  379. cur_date string
  380. data *bytes.Buffer
  381. )
  382.  
  383. defer func() {
  384. if err := recover(); err != nil {
  385. fmt.Println(err)
  386. }
  387. write_last_pos()
  388. if err := c.Close(); err != nil {
  389. fmt.Println(err)
  390. }
  391. os.Exit(1)
  392. }()
  393. go wait()
  394.  
  395. if ( *Log_type == "delivery" ) {
  396. go read_delivery()
  397. } else {
  398. go read_other()
  399. }
  400.  
  401.  
  402.  
  403. fi, err := os.Open(*Seek_info)
  404. if err != nil {
  405. write_last_pos()
  406. fi, err = os.Open(*Seek_info)
  407. if err != nil {
  408. fmt.Println(err)
  409. return
  410. }
  411. }
  412. scanner := bufio.NewScanner(fi)
  413. var read_pos string
  414. for scanner.Scan() {
  415. read_pos = scanner.Text()
  416. }
  417. seek_pos, _ := strconv.ParseInt(read_pos, 0, 64)
  418. fi.Close()
  419.  
  420. config := tail.Config{
  421. Poll: true,
  422. Follow: true,
  423. ReOpen: true,
  424. Location: &tail.SeekInfo{seek_pos, 0},
  425. }
  426.  
  427. t, err := tail.TailFile(*Log_file, config)
  428. if err != nil {
  429. fmt.Printf("trouble while tail file %v\n", err)
  430. return
  431. }
  432.  
  433.  
  434. n := 1
  435. for line := range t.Lines {
  436. if len(line.Text) == 0 {
  437. continue
  438. }
  439. cur_date = strings.Split(line.Text, ";")[0]
  440. _, err := time.Parse(layout, cur_date)
  441. if err != nil {
  442. fmt.Printf("Parse error main: %v\n", err)
  443. continue
  444. }
  445. if (len(strings.Split(line.Text, ";")) < 6) {
  446. continue
  447. }
  448.  
  449.  
  450.  
  451. if (cur_date != date) {
  452. logs <- queue_map{date, data}
  453. data.Reset()
  454. n++
  455. date = cur_date
  456. }
  457.  
  458. data.WriteString(line.Text + "\n")
  459. seek, err = t.Tell()
  460. if err != nil {
  461. fmt.Printf("Error: %v\n", err)
  462. continue
  463. }
  464. }
  465.  
  466. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement