Skip to content

Instantly share code, notes, and snippets.

@MilosSimic
Forked from F21/raft.go
Created July 10, 2018 17:15
Show Gist options
  • Save MilosSimic/944509e45d059feeaea5f99f9d0a7e78 to your computer and use it in GitHub Desktop.
Save MilosSimic/944509e45d059feeaea5f99f9d0a7e78 to your computer and use it in GitHub Desktop.
Sample hashicorp/raft + hashicorp/serf app
package main
import (
"crypto/md5"
"flag"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
)
var (
members string
serfPort int
)
func init() {
flag.StringVar(&members, "members", "", "127.0.0.1:1111,127.0.0.1:2222")
flag.IntVar(&serfPort, "serfPort", 0, "1111")
}
func main() {
flag.Parse()
var peers []string
if members != "" {
peers = strings.Split(members, ",")
}
ip, err := GetFirstPrivateIP()
if err != nil {
log.Fatal(err)
}
serfEvents := make(chan serf.Event, 16)
memberlistConfig := memberlist.DefaultLANConfig()
memberlistConfig.BindAddr = ip
memberlistConfig.BindPort = serfPort
memberlistConfig.LogOutput = os.Stdout
serfConfig := serf.DefaultConfig()
serfConfig.NodeName = fmt.Sprintf("%s:%d", ip, serfPort)
serfConfig.EventCh = serfEvents
serfConfig.MemberlistConfig = memberlistConfig
serfConfig.LogOutput = os.Stdout
s, err := serf.Create(serfConfig)
if err != nil {
log.Fatal(err)
}
// Join an existing cluster by specifying at least one known member.
if len(peers) > 0 {
_, err = s.Join(peers, false)
if err != nil {
log.Fatal(err)
}
}
workDir, err := os.Getwd()
if err != nil {
log.Fatal(err)
}
raftPort := serfPort + 1
id := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%d", ip, raftPort))))
dataDir := filepath.Join(workDir, id)
err = os.RemoveAll(dataDir + "/")
if err != nil {
log.Fatal(err)
}
err = os.MkdirAll(dataDir, 0777)
if err != nil {
log.Fatal(err)
}
raftDBPath := filepath.Join(dataDir, "raft.db")
raftDB, err := raftboltdb.NewBoltStore(raftDBPath)
if err != nil {
log.Fatal(err)
}
snapshotStore, err := raft.NewFileSnapshotStore(dataDir, 1, os.Stdout)
if err != nil {
log.Fatal(err)
}
raftAddr := ip + ":" + strconv.Itoa(raftPort)
trans, err := raft.NewTCPTransport(raftAddr, nil, 3, 10*time.Second, os.Stdout)
if err != nil {
log.Fatal(err)
}
c := raft.DefaultConfig()
c.LogOutput = os.Stdout
c.LocalID = raft.ServerID(raftAddr)
r, err := raft.NewRaft(c, &fsm{}, raftDB, raftDB, snapshotStore, trans)
if err != nil {
log.Fatal(err)
}
bootstrapConfig := raft.Configuration{
Servers: []raft.Server{
{
Suffrage: raft.Voter,
ID: raft.ServerID(raftAddr),
Address: raft.ServerAddress(raftAddr),
},
},
}
// Add known peers to bootstrap
for _, node := range peers {
if node == raftAddr {
continue
}
bootstrapConfig.Servers = append(bootstrapConfig.Servers, raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(node),
Address: raft.ServerAddress(node),
})
}
f := r.BootstrapCluster(bootstrapConfig)
if err := f.Error(); err != nil {
log.Fatalf("error bootstrapping: %s", err)
}
ticker := time.NewTicker(3 * time.Second)
for {
select {
case <-ticker.C:
future := r.VerifyLeader()
fmt.Printf("Showing peers known by %s:\n", raftAddr)
if err = future.Error(); err != nil {
fmt.Println("Node is a follower")
} else {
fmt.Println("Node is leader")
}
cfuture := r.GetConfiguration()
if err = cfuture.Error(); err != nil {
log.Fatalf("error getting config: %s", err)
}
configuration := cfuture.Configuration()
for _, server := range configuration.Servers {
fmt.Println(server.Address)
}
case ev := <-serfEvents:
leader := r.VerifyLeader()
if memberEvent, ok := ev.(serf.MemberEvent); ok {
for _, member := range memberEvent.Members {
changedPeer := member.Addr.String() + ":" + strconv.Itoa(int(member.Port+1))
if memberEvent.EventType() == serf.EventMemberJoin {
if leader.Error() == nil {
f := r.AddVoter(raft.ServerID(changedPeer), raft.ServerAddress(changedPeer), 0, 0)
if f.Error() != nil {
log.Fatalf("error adding voter: %s", err)
}
}
} else if memberEvent.EventType() == serf.EventMemberLeave || memberEvent.EventType() == serf.EventMemberFailed || memberEvent.EventType() == serf.EventMemberReap {
if leader.Error() == nil {
f := r.RemoveServer(raft.ServerID(changedPeer), 0, 0)
if f.Error() != nil {
log.Fatalf("error removing server: %s", err)
}
}
}
}
}
}
}
}
type fsm struct {
}
func (f *fsm) Apply(*raft.Log) interface{} {
return nil
}
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
return nil, nil
}
func (f *fsm) Restore(io.ReadCloser) error {
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment