Skip to content

Instantly share code, notes, and snippets.

@Preetam
Created May 1, 2016 20:01
Show Gist options
  • Save Preetam/75bf55d0069cec83ccaf546fe3c1076e to your computer and use it in GitHub Desktop.
Save Preetam/75bf55d0069cec83ccaf546fe3c1076e to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"strings"
"sync"
"github.com/Preetam/libab/go/ab"
"github.com/VividCortex/siesta"
)
var (
encryptionKey = ""
cluster = map[uint64]string{
10: "127.0.0.1:2010",
20: "127.0.0.1:2020",
30: "127.0.0.1:2030",
}
clusterHTTP = map[uint64]string{
10: "http://127.0.0.1:8010",
20: "http://127.0.0.1:8020",
30: "http://127.0.0.1:8030",
}
id = uint64(0)
node *ab.Node
state = map[string]string{}
stateCommit = uint64(0)
isLeader = false
currentLeader = uint64(0)
lock sync.Mutex
pendingState = map[string]string{}
pendingRound = uint64(0)
pendingCommit = uint64(0)
)
type Snapshot struct {
Data map[string]string `json:"data"`
Commit uint64 `json:"commit"`
}
type handler struct{}
func (h handler) OnLeaderChange(leaderID uint64) {
fmt.Println("OnLeaderChange", leaderID)
lock.Lock()
currentLeader = leaderID
lock.Unlock()
}
func (h handler) OnAppend(round uint64, commit uint64, data string) {
fmt.Println("append:", data)
lock.Lock()
defer lock.Unlock()
json.NewDecoder(strings.NewReader(data)).Decode(&pendingState)
pendingCommit = commit
pendingRound = round
node.ConfirmAppend(round, commit)
}
func (h handler) OnCommit(round uint64, commit uint64) {
lock.Lock()
defer lock.Unlock()
fmt.Println("OnCommit", round, commit, pendingRound, pendingCommit)
if pendingRound == round && pendingCommit == commit {
state = pendingState
stateCommit = commit
}
if stateCommit != commit {
go fetchSnapshots()
}
}
func (h handler) LostLeadership() {
lock.Lock()
isLeader = false
currentLeader = 0
lock.Unlock()
}
func (h handler) GainedLeadership() {
lock.Lock()
isLeader = true
currentLeader = 0
lock.Unlock()
}
func main() {
flag.Uint64Var(&id, "id", 0, "node ID")
flag.StringVar(&encryptionKey, "key", "", "encryption key")
flag.Parse()
if _, ok := cluster[id]; !ok {
log.Fatal("invalid ID")
}
var err error
node, err = ab.NewNode(id, cluster[id], handler{}, len(cluster))
if err != nil {
log.Fatal(err)
}
for peerID, peer := range cluster {
if peerID != id {
node.AddPeer(peer)
}
}
node.SetKey(encryptionKey)
go func() {
panic(node.Run())
}()
service := siesta.NewService("/")
service.AddPost(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
})
service.Route("GET", "/", "", func(w http.ResponseWriter, r *http.Request) {
lock.Lock()
defer lock.Unlock()
if !isLeader && currentLeader != 0 {
w.Header().Set("Location", clusterHTTP[currentLeader])
return
}
json.NewEncoder(w).Encode(state)
})
service.Route("GET", "/snapshot", "", func(w http.ResponseWriter, r *http.Request) {
lock.Lock()
defer lock.Unlock()
snapshot := Snapshot{
Data: state,
Commit: stateCommit,
}
json.NewEncoder(w).Encode(snapshot)
})
service.Route("POST", "/", "", func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
newState := map[string]string{}
err := json.NewDecoder(r.Body).Decode(&newState)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
buf := bytes.NewBuffer(nil)
if err = json.NewEncoder(buf).Encode(newState); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
lock.Lock()
defer lock.Unlock()
if !isLeader {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
newRound, newCommit, err := node.Append(buf.String())
if err = json.NewEncoder(buf).Encode(newState); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
pendingState = newState
pendingRound = newRound
pendingCommit = newCommit
})
panic(http.ListenAndServe(fmt.Sprintf("localhost:%d", id+8000), service))
}
func fetchSnapshots() {
for nodeID, endpoint := range clusterHTTP {
if nodeID == id {
continue
}
res, err := http.Get(endpoint + "/snapshot")
if err != nil {
continue
}
defer res.Body.Close()
snapshot := Snapshot{}
err = json.NewDecoder(res.Body).Decode(&snapshot)
if err != nil {
continue
}
lock.Lock()
if snapshot.Commit > stateCommit {
state = snapshot.Data
stateCommit = snapshot.Commit
}
lock.Unlock()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment