Created
June 13, 2017 22:33
-
-
Save bbeaudreault/ea0f42d62e55768699a32bec02ea72f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package vitessutil | |
import ( | |
"fmt" | |
"encoding/json" | |
"time" | |
"github.com/pkg/errors" | |
jww "github.com/spf13/jwalterweatherman" | |
"github.com/youtube/vitess/go/vt/proto/logutil" | |
"github.com/youtube/vitess/go/vt/proto/topodata" | |
"github.com/youtube/vitess/go/vt/topo/topoproto" | |
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient" | |
) | |
// ResetReplication should only be run at keyspaceshard creation time, unless a good reason. It clears the master and slave | |
// state including GTID history. | |
func ResetReplication(vtctl vtctlclient.VtctlClient, alias string) error { | |
jww.INFO.Println("resetting replication on tablet", alias) | |
if _, err := executeSQL(vtctl, alias, true, "STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"); err != nil { | |
return err | |
} | |
return nil | |
} | |
// VerifyReplicationHealth checks the replication status of all passed tablets, verifying that readonly and slave status | |
// is correct. It also checks for errant gtids. | |
func VerifyReplicationHealth(vclient vtctlclient.VtctlClient, keyspace, shard string, tablets []Tablet) error { | |
keyspaceShard := topoproto.KeyspaceShardString(keyspace, shard) | |
// We do the replicas first to ensure that the master has a superset of gtids when we check it later | |
// if we checked the master first, sometimes due to semi-sync a slave may have 1 more gtid executed than the master, | |
// depending on where in the semi-sync flow we checked | |
jww.INFO.Println("getting replication status for replicas") | |
replicaStatuses, err := getReplicationStatusForReplicas(vclient, tablets) | |
if err != nil { | |
return err | |
} | |
jww.INFO.Println("getting replication status for master") | |
masterStatus, err := getMasterReplicationStatus(vclient, tablets) | |
if err != nil { | |
return errors.Errorf("failed to get status for master of keyspaceShard %v: %v", keyspaceShard, err) | |
} | |
if masterStatus.readOnly == 1 { | |
return errors.Errorf("master is in read only mode for keyspaceShard %v", keyspaceShard) | |
} | |
jww.INFO.Println("comparing GTIDSets") | |
for _, status := range replicaStatuses { | |
diff, err := verifyGtids(vclient, masterStatus.tablet.Alias, masterStatus.serverGtid, status.serverGtid) | |
if err != nil { | |
return errors.Errorf("failed to compare serverGtids for replica %v and master %v: %v", status.tablet.Fqdn, masterStatus.tablet.Fqdn, err) | |
} | |
if diff != "" { | |
return errors.Errorf("errant GTID detected for keyspaceShard %v on replica %v (%v)!\n\ndiff: %v \n\nreplicaUuid: %v\nreplicaGtd:\n%v\n\nmasterUuid: %v\nmasterGtid:\n%v", keyspaceShard, status.tablet.Alias, status.tablet.Fqdn, diff, status.serverUUID, status.serverGtid, masterStatus.serverUUID, masterStatus.serverGtid) | |
} | |
} | |
jww.INFO.Println("replication is healthy") | |
return nil | |
} | |
func getReplicationStatusForReplicas(vclient vtctlclient.VtctlClient, tablets []Tablet) ([]status, error) { | |
result := make([]status, 0, len(tablets)) | |
for _, tablet := range tablets { | |
if tablet.TabletType == topodata.TabletType_MASTER { | |
continue | |
} | |
status, err := getStatusForTablet(vclient, tablet) | |
if err != nil { | |
return nil, errors.Errorf("failed to get status for replica %v of keyspace=%v, shard=%v: %v", tablet.Alias, tablet.Keyspace, tablet.Shard, err) | |
} | |
if status.readOnly != 1 { | |
return nil, errors.Errorf("replica %v (%v) is NOT in read only mode for keyspace=%v, shard=%v ", tablet.Alias, tablet.Fqdn, tablet.Keyspace, tablet.Shard) | |
} | |
result = append(result, status) | |
} | |
return result, nil | |
} | |
func verifyGtids(vclient vtctlclient.VtctlClient, masterAlias, masterGtids, replicaGtids string) (string, error) { | |
resp, err := executeSQL(vclient, masterAlias, false, fmt.Sprintf("select GTID_SUBTRACT(\"%v\", \"%v\")", replicaGtids, masterGtids)) | |
if err != nil { | |
return "", errors.Wrap(err, "failed to compare gtidsets") | |
} | |
var result struct { | |
Rows [][]interface{} | |
} | |
if err := json.Unmarshal([]byte(resp[0]), &result); err != nil { | |
return "", errors.Wrap(err, "failed to unmarshal result from query") | |
} | |
return result.Rows[0][0].(string), nil | |
} | |
type status struct { | |
tablet Tablet | |
readOnly int | |
serverUUID string | |
serverGtid string | |
} | |
// ErrNotFound means we couldn't find a master for the given keyspace. | |
var ErrNotFound = errors.New("could not find a master for keyspace") | |
func getMasterReplicationStatus(vclient vtctlclient.VtctlClient, tablets []Tablet) (status, error) { | |
for _, tablet := range tablets { | |
if tablet.TabletType != topodata.TabletType_MASTER { | |
continue | |
} | |
return getStatusForTablet(vclient, tablet) | |
} | |
return status{}, ErrNotFound | |
} | |
func getStatusForTablet(vclient vtctlclient.VtctlClient, tablet Tablet) (status, error) { | |
resp, err := executeSQL(vclient, tablet.Alias, false, "select @@global.read_only as readOnly, @@global.server_uuid as serverUUID, @@global.gtid_executed as serverGtid") | |
if err != nil { | |
return status{}, errors.Wrap(err, "failed to get status for tablet") | |
} | |
var result struct { | |
Rows [][]interface{} | |
} | |
if err := json.Unmarshal([]byte(resp[0]), &result); err != nil { | |
return status{}, errors.Wrapf(err, "failed to unmarshal query result: %v", resp[0]) | |
} | |
return status{ | |
tablet: tablet, | |
readOnly: int(result.Rows[0][0].(float64)), | |
serverUUID: result.Rows[0][1].(string), | |
serverGtid: result.Rows[0][2].(string), | |
}, nil | |
} | |
func executeSQL(vtctl vtctlclient.VtctlClient, alias string, disableBinlog bool, queries ...string) ([]string, error) { | |
args := []string{ | |
"ExecuteFetchAsDba", | |
"-json", | |
} | |
if disableBinlog { | |
args = append(args, "-disable_binlogs") | |
} | |
args = append(args, alias) | |
resp := make([]string, 0, len(queries)) | |
for i, q := range queries { | |
if i == 0 { | |
args = append(args, q) | |
} else { | |
args[len(args)-1] = q | |
} | |
if err := RunVtctlCommand(vtctl, args, 5*time.Second, func(event *logutil.Event) error { | |
resp = append(resp, event.Value) | |
return nil | |
}); err != nil { | |
return nil, errors.Wrapf(err, "failed on query: %v", q) | |
} | |
} | |
return resp, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment