Created
October 9, 2017 05:17
-
-
Save dantin/f8cc974c976a51241f89694533067f43 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"database/sql" | |
"flag" | |
"fmt" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
_ "github.com/go-sql-driver/mysql" | |
"github.com/ngaut/log" | |
"github.com/siddontang/go/sync2" | |
"golang.org/x/net/context" | |
) | |
const ( | |
longForm = "2006-01-02 15:04:05" | |
shortForm = "2006-01-02" | |
) | |
var ( | |
host = flag.String("host", "10.30.1.4", "TiDB host") | |
port = flag.Int("port", 4000, "TiDB port") | |
username = flag.String("username", "root", "TiDB username") | |
password = flag.String("password", "", "TiDB password") | |
database = flag.String("database", "lock_event_log", "Database") | |
concurrency = flag.Int("concurrent", 10, "Concurrency") | |
closed sync2.AtomicBool | |
) | |
type timeSlot struct { | |
fromPos time.Time | |
toPos time.Time | |
} | |
func openDB() (*sql.DB, error) { | |
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", *username, *password, *host, *port, *database) | |
db, err := sql.Open("mysql", dbDSN) | |
if err != nil { | |
return nil, err | |
} | |
db.SetMaxIdleConns(*concurrency) | |
return db, nil | |
} | |
func mustExec(db *sql.DB, query string, args ...interface{}) sql.Result { | |
r, err := db.Exec(query, args...) | |
if err != nil { | |
log.Fatalf("exec %s err %v", query, err) | |
} | |
return r | |
} | |
func getStartPos(db *sql.DB) string { | |
var ts sql.NullString | |
t, err := time.Parse(longForm, "2017-05-01 00:00:00") | |
if err != nil { | |
log.Fatal(err) | |
} | |
d, err := time.ParseDuration("24h") | |
if err != nil { | |
log.Fatal(err) | |
} | |
for { | |
query := fmt.Sprintf("SELECT MIN(create_time) FROM mbk_lock_event WHERE create_time <'%s'", t.Format(longForm)) | |
err = db.QueryRow(query).Scan(&ts) | |
if err != nil { | |
log.Fatal(err) | |
} | |
if ts.Valid { | |
return ts.String | |
} else { | |
t = t.Add(d) | |
} | |
} | |
} | |
func getStopPos() string { | |
t := time.Now().AddDate(0, 0, -120) | |
return fmt.Sprintf("%s 00:00:00", t.Format(shortForm)) | |
} | |
func main() { | |
flag.Parse() | |
db, err := openDB() | |
if err != nil { | |
log.Fatal(err) | |
} | |
closed.Set(false) | |
startPos := getStartPos(db) | |
stopPos := getStopPos() | |
log.Infof("Clean log from %s to %s.", startPos, stopPos) | |
ctx, cancel := context.WithCancel(context.Background()) | |
sc := make(chan os.Signal, 1) | |
signal.Notify(sc, | |
syscall.SIGHUP, | |
syscall.SIGINT, | |
syscall.SIGTERM, | |
syscall.SIGQUIT) | |
go func() { | |
sig := <-sc | |
log.Infof("Got signal [%d] to exit.", sig) | |
db.Close() | |
cancel() | |
closed.Set(true) | |
}() | |
jobChan := make(chan *timeSlot, *concurrency) | |
var wg sync.WaitGroup | |
wg.Add(*concurrency) | |
for i := 0; i < *concurrency; i++ { | |
go func() { | |
defer wg.Done() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
// do job | |
for { | |
slot, ok := <-jobChan | |
if !ok { | |
return | |
} | |
start := time.Now() | |
mustExec(db, fmt.Sprintf("DELETE FROM mbk_lock_event WHERE create_time >= '%s' AND create_time < '%s'", slot.fromPos.Format(longForm), slot.toPos.Format(longForm))) | |
log.Infof("delete lock event log in [%s, %s), takes %s", slot.fromPos.Format(longForm), slot.toPos.Format(longForm), time.Now().Sub(start)) | |
} | |
} | |
} | |
}() | |
} | |
// add jobs | |
go func() { | |
d, _ := time.ParseDuration("1s") | |
curr, _ := time.Parse(longForm, startPos) | |
end, _ := time.Parse(longForm, stopPos) | |
for !closed.Get() && curr.Before(end) { | |
next := curr.Add(d) | |
jobChan <- &timeSlot{fromPos: curr, toPos: next} | |
curr = next | |
} | |
close(jobChan) | |
}() | |
wg.Wait() | |
log.Info("Done!") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment