Skip to content

Instantly share code, notes, and snippets.

@rgarcia
Created May 13, 2015 04:00
Show Gist options
  • Save rgarcia/1bb6d65b94732a4dda22 to your computer and use it in GitHub Desktop.
Save rgarcia/1bb6d65b94732a4dda22 to your computer and use it in GitHub Desktop.
program that runs a mongo replset and periodically reconfigures it
package main
import (
"fmt"
"io"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/facebookgo/mgotest"
"github.com/kr/pretty"
"gopkg.in/mgo.v2/bson"
)
// ReplSetStatus is the result of running rs.status()
type ReplSetStatus struct {
Ok float64 `bson:"ok,omitempty"`
Set string `bson:"set,omitempty"`
Date time.Time `bson:"date,omitempty"`
MyState int64 `bson:"myState,omitempty"`
Members []struct {
Self bool `bson:"self,omitempty"`
ID int64 `bson:"_id"`
Health float64 `bson:"health,omitempty"`
StateStr string `bson:"stateStr,omitempty"`
Uptime int64 `bson:"uptime,omitempty"`
Optime time.Time `bson:"optime,omitempty"`
OptimeDate time.Time `bson:"optimeDate,omitempty"`
Name string `bson:"name,omitempty"`
State int64 `bson:"state,omitempty"`
} `bson:"members,omitempty"`
}
// ReplSetConfig is the result of running rs.config()
type ReplSetConfig struct {
ID string `bson:"_id"`
Version int64 `bson:"version,omitempty"`
Members []ReplSetConfigMember `bson:"members,omitempty"`
Settings struct {
GetLastErrorDefaults bson.M `bson:"getLastErrorDefaults,omitempty"`
ChainingAllowed bool `bson:"chainingAllowed,omitempty"`
GetLastErrorModes bson.M `bson:"getLastErrorModes,omitempty"`
HeartbeatTimeoutSecs int64 `bson:"heartbeatTimeoutSecs,omitempty"`
}
}
type ReplSetConfigMember struct {
ID int64 `bson:"_id"`
Host string `bson:"host,omitempty"`
ArbiterOnly bool `bson:"arbiterOnly,omitempty"`
BuildIndexes bool `bson:"buildIndexes,omitempty"`
Hidden bool `bson:"hidden,omitempty"`
Priority int64 `bson:"priority,omitempty"`
Tags bson.M `bson:"tags,omitempty"`
SlaveDelay int64 `bson:"slaveDelay,omitempty"`
Votes int64 `bson:"votes,omitempty"`
}
func toBSONM(input interface{}) bson.M {
marshalled, _ := bson.Marshal(input)
var result bson.M
_ = bson.Unmarshal(marshalled, &result)
return result
}
func rsStatus(replset *mgotest.ReplicaSet) (ReplSetStatus, error) {
session := replset.Session()
defer session.Close()
var status ReplSetStatus
return status, session.DB("admin").Run(bson.M{"replSetGetStatus": 1}, &status)
}
func rsConfig(replset *mgotest.ReplicaSet) (ReplSetConfig, error) {
session := replset.Session()
defer session.Close()
var config ReplSetConfig
// in 2.6 there is no replSetConfig db command (3.0 has it)
// so instead use the system.replset collection which yields an equivalent response
return config, session.DB("local").C("system.replset").Find(nil).One(&config)
}
func rsReconfig(replset *mgotest.ReplicaSet, newconfig bson.M) error {
session := replset.Session()
defer session.Close()
// io.EOF (disconnect) is expected in the case of removing
if err := session.DB("admin").Run(bson.M{"replSetReconfig": newconfig}, nil); !(err == io.EOF || err == nil) {
return fmt.Errorf("expected io.EOF running reconfig, got: %s", err)
}
return nil
}
func rsReconfigRandomly(replset *mgotest.ReplicaSet) error {
session := replset.Session()
defer session.Close()
status, err := rsStatus(replset)
if err != nil {
return fmt.Errorf("error running rs.status: %s", err)
}
log.Printf("current replset status: %# v", pretty.Formatter(status))
config, err := rsConfig(replset)
if err != nil {
return fmt.Errorf("error running rs.config: %s", err)
}
log.Printf("current replset config: %# v", pretty.Formatter(config))
// either add a member or remove a member
newConfig := config
newConfig.Version++
if fullStrengthSize := len(replset.Servers); len(config.Members) == fullStrengthSize {
newConfig.Members = newConfig.Members[:len(newConfig.Members)-1]
} else {
// figure out which host is missing
host := ""
for _, runningserver := range replset.Servers {
inreplset := false
for _, replsetmember := range newConfig.Members {
if replsetmember.Host == runningserver.URL() {
inreplset = true
break
}
}
if !inreplset {
host = runningserver.URL()
}
}
if host == "" {
return fmt.Errorf("could not determine which server to add back into the replset")
}
newConfig.Members = append(newConfig.Members, ReplSetConfigMember{
ID: int64(fullStrengthSize - 1),
Host: host,
})
}
// run rs.reconfig
newConfigBSONM := toBSONM(newConfig)
log.Printf("new replset config: %# v", pretty.Formatter(newConfigBSONM))
if err := rsReconfig(replset, newConfigBSONM); err != nil {
return fmt.Errorf("error running rs.reconfig: %s", err)
}
return nil
}
func main() {
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
for _ = range time.Tick(5 * time.Second) {
log.Print("hit ctrl-c to exit at any time")
}
}()
go func() {
sig := <-sigs
done <- true
}()
// create a replset
replset := mgotest.NewReplicaSet(3, log.New(os.Stdout, "mgotest", log.Flags()))
defer replset.Stop()
log.Printf("replset running:")
for i, server := range replset.Servers {
log.Printf("%d: %s", i+1, server.URL())
}
// reconfig it every so often
go func() {
for _ = range time.Tick(10 * time.Second) {
if err := rsReconfigRandomly(replset); err != nil {
log.Printf("error reconfiging randomly: %s", err)
}
}
}()
<-done
fmt.Println("exiting")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment