Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "bufio"
- "bytes"
- _ "github.com/go-sql-driver/mysql"
- "database/sql"
- "flag"
- "fmt"
- "io"
- "log"
- "os"
- "strings"
- )
- var (
- // Actual messages
- msgs = make(chan string, 1000000)
- // Indicate when consumer has finishes
- done = make(chan bool)
- db_host string
- db_user string
- db_password string
- db_name string
- db_charset string
- )
- func produce() {
- r := bufio.NewReaderSize(io.Reader(os.Stdin), 32*1024*1024)
- var query bytes.Buffer
- for {
- line, err := r.ReadString('\n')
- if err == nil {
- // Based on pt-query-digest's output:
- // Start of a new '#' block indicates the previous query has
- // ended. This is slightly more robust than looking for a trailing
- // semi-colon, in case the query doesn't end in one.
- if len(line) > 0 && line[0] == '#' {
- s := strings.TrimSpace(query.String())
- if len(s) > 1 {
- select {
- case msgs <- s:
- /* nothing */
- default:
- log.Println("Channel full; dropping message")
- }
- }
- query.Reset()
- } else {
- query.WriteString(" " + line)
- }
- continue
- }
- if err != nil && err != io.EOF {
- fmt.Println("Error reading from stdin: " + err.Error())
- break
- }
- if err != nil && err == io.EOF {
- break
- }
- }
- fmt.Println("Done with produce")
- close(msgs)
- }
- func consume() {
- defer func() {
- done <- true
- }()
- for {
- select {
- case msg, ok := <-msgs:
- if !ok {
- return
- }
- // Creating a new connection for every query. Trying to exercise MySQL's connection
- // handling a bit
- db, e := sql.Open("mysql", db_user+":"+db_password+"@tcp("+db_host+":3306)/"+db_name+"?charset="+db_charset+"&multiStatements=true"
- if e != nil {
- panic(e)
- }
- _, err := db.Exec(msg)
- if err != nil {
- log.Println(msg)
- log.Println(err.Error())
- }
- db.Close()
- }
- }
- }
- func main() {
- var threads = flag.Int("threads", 8, "Execution threads")
- flag.StringVar(&db_host, "db-host", "", "DB Host")
- flag.StringVar(&db_user, "db-user", "", "DB Username")
- flag.StringVar(&db_password, "db-password", "", "DB Password")
- flag.StringVar(&db_name, "db-name", "", "DB Name")
- flag.StringVar(&db_charset, "db-charset", "", "DB Character set")
- var error_log = flag.String("log", "", "Error log file location and name")
- var help = flag.Bool("h", false, "Help")
- flag.Parse()
- if *help == true {
- flag.Usage()
- return
- }
- if db_host == "" || db_user == "" || db_password == "" || db_name == "" || db_charset == "" || *error_log == "" {
- flag.Usage()
- return
- }
- logFile, err := os.OpenFile(*error_log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
- if err != nil {
- panic(err.Error() + " opening log file: " + *error_log)
- }
- log.SetOutput(logFile)
- log.SetFlags(log.LstdFlags)
- fmt.Println(fmt.Sprintf("Starting go-execution with %d threads", *threads))
- go produce()
- for i := 0; i < *threads; i++ {
- go consume()
- }
- for i := 0; i < *threads; i++ {
- <-done
- }
- fmt.Println("Done with consume")
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement