Advertisement
Guest User

Untitled

a guest
Aug 2nd, 2016
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.88 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "bufio"
  5. "bytes"
  6. _ "github.com/go-sql-driver/mysql"
  7. "database/sql"
  8. "flag"
  9. "fmt"
  10. "io"
  11. "log"
  12. "os"
  13. "strings"
  14. )
  15.  
  16. var (
  17. // Actual messages
  18. msgs = make(chan string, 1000000)
  19.  
  20. // Indicate when consumer has finishes
  21. done = make(chan bool)
  22.  
  23. db_host string
  24. db_user string
  25. db_password string
  26. db_name string
  27. db_charset string
  28. )
  29.  
  30. func produce() {
  31. r := bufio.NewReaderSize(io.Reader(os.Stdin), 32*1024*1024)
  32. var query bytes.Buffer
  33. for {
  34. line, err := r.ReadString('\n')
  35. if err == nil {
  36. // Based on pt-query-digest's output:
  37. // Start of a new '#' block indicates the previous query has
  38. // ended. This is slightly more robust than looking for a trailing
  39. // semi-colon, in case the query doesn't end in one.
  40. if len(line) > 0 && line[0] == '#' {
  41. s := strings.TrimSpace(query.String())
  42. if len(s) > 1 {
  43. select {
  44. case msgs <- s:
  45. /* nothing */
  46. default:
  47. log.Println("Channel full; dropping message")
  48. }
  49. }
  50. query.Reset()
  51. } else {
  52. query.WriteString(" " + line)
  53. }
  54. continue
  55. }
  56. if err != nil && err != io.EOF {
  57. fmt.Println("Error reading from stdin: " + err.Error())
  58. break
  59. }
  60. if err != nil && err == io.EOF {
  61. break
  62. }
  63. }
  64. fmt.Println("Done with produce")
  65. close(msgs)
  66. }
  67.  
  68. func consume() {
  69. defer func() {
  70. done <- true
  71. }()
  72.  
  73. for {
  74. select {
  75. case msg, ok := <-msgs:
  76. if !ok {
  77. return
  78. }
  79.  
  80. // Creating a new connection for every query. Trying to exercise MySQL's connection
  81. // handling a bit
  82. db, e := sql.Open("mysql", db_user+":"+db_password+"@tcp("+db_host+":3306)/"+db_name+"?charset="+db_charset+"&multiStatements=true"
  83. if e != nil {
  84. panic(e)
  85. }
  86.  
  87. _, err := db.Exec(msg)
  88. if err != nil {
  89. log.Println(msg)
  90. log.Println(err.Error())
  91. }
  92. db.Close()
  93. }
  94. }
  95.  
  96. }
  97.  
  98. func main() {
  99. var threads = flag.Int("threads", 8, "Execution threads")
  100. flag.StringVar(&db_host, "db-host", "", "DB Host")
  101. flag.StringVar(&db_user, "db-user", "", "DB Username")
  102. flag.StringVar(&db_password, "db-password", "", "DB Password")
  103. flag.StringVar(&db_name, "db-name", "", "DB Name")
  104. flag.StringVar(&db_charset, "db-charset", "", "DB Character set")
  105. var error_log = flag.String("log", "", "Error log file location and name")
  106. var help = flag.Bool("h", false, "Help")
  107. flag.Parse()
  108.  
  109. if *help == true {
  110. flag.Usage()
  111. return
  112. }
  113.  
  114. if db_host == "" || db_user == "" || db_password == "" || db_name == "" || db_charset == "" || *error_log == "" {
  115. flag.Usage()
  116. return
  117. }
  118.  
  119. logFile, err := os.OpenFile(*error_log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  120. if err != nil {
  121. panic(err.Error() + " opening log file: " + *error_log)
  122. }
  123. log.SetOutput(logFile)
  124. log.SetFlags(log.LstdFlags)
  125.  
  126. fmt.Println(fmt.Sprintf("Starting go-execution with %d threads", *threads))
  127. go produce()
  128. for i := 0; i < *threads; i++ {
  129. go consume()
  130. }
  131.  
  132. for i := 0; i < *threads; i++ {
  133. <-done
  134. }
  135.  
  136. fmt.Println("Done with consume")
  137. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement