Skip to content

Instantly share code, notes, and snippets.

@marekgalovic
Created September 7, 2018 08:12
Show Gist options
  • Save marekgalovic/d2c7769e5039086bdb7a8530fa29449a to your computer and use it in GitHub Desktop.
Save marekgalovic/d2c7769e5039086bdb7a8530fa29449a to your computer and use it in GitHub Desktop.
Etcd Raft consensus protocol local example
package main
import (
"os";
"os/signal";
"syscall";
"time";
"context";
"math";
"bytes";
"go.etcd.io/etcd/raft";
"go.etcd.io/etcd/raft/raftpb";
log "github.com/Sirupsen/logrus";
)
var nodes = make(map[uint64]*node)
type node struct {
config *raft.Config
raft raft.Node
storage *raft.MemoryStorage
kv map[string]string
}
func newNode(id int, peers []raft.Peer) *node {
storage := raft.NewMemoryStorage()
c := &raft.Config {
ID: uint64(id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: math.MaxUint16,
MaxInflightMsgs: 256,
}
n := &node {
config: c,
raft: raft.StartNode(c, peers),
storage: storage,
kv: make(map[string]string),
}
nodes[c.ID] = n
return n
}
func(n *node) run(ctx context.Context) {
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <- ticker.C:
n.raft.Tick()
case rd := <- n.raft.Ready():
n.save(rd.HardState, rd.Entries, rd.Snapshot)
n.send(ctx, rd.Messages)
for _, entry := range rd.CommittedEntries {
switch entry.Type {
case raftpb.EntryNormal:
n.process(entry)
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
n.raft.ApplyConfChange(cc)
}
}
n.raft.Advance()
case <- ctx.Done():
return
}
}
}
func (n *node) save(hardState raftpb.HardState, entries []raftpb.Entry, snapshot raftpb.Snapshot) {
n.storage.Append(entries)
if !raft.IsEmptyHardState(hardState) {
n.storage.SetHardState(hardState)
}
if !raft.IsEmptySnap(snapshot) {
n.storage.ApplySnapshot(snapshot)
}
}
func (n *node) process(entry raftpb.Entry) {
if entry.Data == nil {
return
}
kv := bytes.SplitN(entry.Data, []byte(":"), 2)
n.kv[string(kv[0])] = string(kv[1])
}
func (n *node) send(ctx context.Context, messages []raftpb.Message) {
for _, m := range messages {
if _, exists := nodes[m.To]; !exists {
continue
}
nodes[m.To].receive(ctx, m)
}
}
func (n *node) receive(ctx context.Context, message raftpb.Message) {
n.raft.Step(ctx, message)
}
func main() {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
nodeA := newNode(1, []raft.Peer{{ID: 1}, {ID: 2}})
go nodeA.run(ctx)
nodeB := newNode(2, []raft.Peer{{ID: 1}, {ID: 2}})
go nodeB.run(ctx)
nodeC := newNode(3, nil)
go nodeC.run(ctx)
nodeB.raft.ProposeConfChange(ctx, raftpb.ConfChange {
ID: 3,
Type: raftpb.ConfChangeAddNode,
NodeID: 3,
})
log.Info(nodeA.raft.Propose(ctx, []byte("foo:bar")))
log.Info(nodeB.raft.Propose(ctx, []byte("foo:baz")))
log.Info(nodeC.raft.Propose(ctx, []byte("foo:bag")))
time.Sleep(100 * time.Millisecond)
for _, node := range nodes {
for k, v := range node.kv {
log.Infof("Node: %d, %s -> %s", node.config.ID, k, v)
}
}
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
<- wait
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment