Created
March 18, 2013 19:01
-
-
Save keyurdg/5189836 to your computer and use it in GitHub Desktop.
Read output from pt-query-digest and multiplex queries to MySQL over multiple threads.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
_ "github.com/Go-SQL-Driver/MySQL" | |
"database/sql" | |
"flag" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"strings" | |
) | |
var ( | |
// Actual messages | |
msgs = make(chan string, 1000000) | |
// Indicate when consumer has finishes | |
done = make(chan bool) | |
db_host string | |
db_user string | |
db_password string | |
db_name string | |
db_charset string | |
) | |
func produce() { | |
r := bufio.NewReaderSize(io.Reader(os.Stdin), 32*1024*1024) | |
var query bytes.Buffer | |
for { | |
line, err := r.ReadString('\n') | |
if err == nil { | |
// Based on pt-query-digest's output: | |
// Start of a new '#' block indicates the previous query has | |
// ended. This is slightly more robust than looking for a trailing | |
// semi-colon, in case the query doesn't end in one. | |
if len(line) > 0 && line[0] == '#' { | |
s := strings.TrimSpace(query.String()) | |
if len(s) > 1 { | |
select { | |
case msgs <- s: | |
/* nothing */ | |
default: | |
log.Println("Channel full; dropping message") | |
} | |
} | |
query.Reset() | |
} else { | |
query.WriteString(" " + line) | |
} | |
continue | |
} | |
if err != nil && err != io.EOF { | |
fmt.Println("Error reading from stdin: " + err.Error()) | |
break | |
} | |
if err != nil && err == io.EOF { | |
break | |
} | |
} | |
fmt.Println("Done with produce") | |
close(msgs) | |
} | |
func consume() { | |
defer func() { | |
done <- true | |
}() | |
for { | |
select { | |
case msg, ok := <-msgs: | |
if !ok { | |
return | |
} | |
// Creating a new connection for every query. Trying to exercise MySQL's connection | |
// handling a bit | |
db, e := sql.Open("mysql", db_user+":"+db_password+"@tcp("+db_host+":3306)/"+db_name+"?charset="+db_charset) | |
if e != nil { | |
panic(e) | |
} | |
_, err := db.Exec(msg) | |
if err != nil { | |
log.Println(msg) | |
log.Println(err.Error()) | |
} | |
db.Close() | |
} | |
} | |
} | |
func main() { | |
var threads = flag.Int("threads", 8, "Execution threads") | |
flag.StringVar(&db_host, "db-host", "", "DB Host") | |
flag.StringVar(&db_user, "db-user", "", "DB Username") | |
flag.StringVar(&db_password, "db-password", "", "DB Password") | |
flag.StringVar(&db_name, "db-name", "", "DB Name") | |
flag.StringVar(&db_charset, "db-charset", "", "DB Character set") | |
var error_log = flag.String("log", "", "Error log file location and name") | |
var help = flag.Bool("h", false, "Help") | |
flag.Parse() | |
if *help == true { | |
flag.Usage() | |
return | |
} | |
if db_host == "" || db_user == "" || db_password == "" || db_name == "" || db_charset == "" || *error_log == "" { | |
flag.Usage() | |
return | |
} | |
logFile, err := os.OpenFile(*error_log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) | |
if err != nil { | |
panic(err.Error() + " opening log file: " + *error_log) | |
} | |
log.SetOutput(logFile) | |
log.SetFlags(log.LstdFlags) | |
fmt.Println(fmt.Sprintf("Starting go-execution with %d threads", *threads)) | |
go produce() | |
for i := 0; i < *threads; i++ { | |
go consume() | |
} | |
for i := 0; i < *threads; i++ { | |
<-done | |
} | |
fmt.Println("Done with consume") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment