Last active
October 11, 2016 14:59
-
-
Save icub3d/7505798 to your computer and use it in GitHub Desktop.
An example of using doozer to keep track of your groupcache instances. If you want to run it yourself, you'll want to: go get github.com/golang/groupcache
go get github.com/ha/doozer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"flag" | |
"fmt" | |
"github.com/golang/groupcache" | |
"github.com/ha/doozer" | |
"io" | |
"log" | |
"net" | |
"net/http" | |
"os" | |
"os/signal" | |
"path" | |
"strings" | |
"sync/atomic" | |
) | |
var ( | |
addr string // The address of our httpd server. | |
daddr string // The address of the doozer server. | |
dictName string // The name of the dictionary. | |
dictAddr string // The address of the dictionary server. | |
// This is our groupcache stuff. | |
pool *groupcache.HTTPPool | |
dict *groupcache.Group | |
) | |
func init() { | |
flag.StringVar(&addr, "addr", "127.0.0.1:8000", | |
"the addr:port on which this server should run.") | |
flag.StringVar(&daddr, "doozer", "127.0.0.1:8046", | |
"the addr:port on which doozer is running.") | |
flag.StringVar(&dictName, "dictname", "gcide", | |
"the name of the dictionary to query.") | |
flag.StringVar(&dictAddr, "dictaddr", "dict.org:2628", | |
"the addr:port to the dict server to query.") | |
} | |
func main() { | |
flag.Parse() | |
// Setup the doozer connection. | |
d, err := doozer.Dial(daddr) | |
if err != nil { | |
log.Fatalf("connecting to doozer: %v\n", err) | |
} | |
defer d.Close() | |
// Setup the cache. | |
pool = groupcache.NewHTTPPool("http://" + addr) | |
dict = groupcache.NewGroup("dict", 64<<20, groupcache.GetterFunc( | |
func(ctx groupcache.Context, key string, dest groupcache.Sink) error { | |
def, err := query(key) | |
if err != nil { | |
err = fmt.Errorf("querying remote dictionary: %v", err) | |
log.Println(err) | |
return err | |
} | |
log.Println("retrieved remote definition for", key) | |
dest.SetString(def) | |
return nil | |
})) | |
// Start watching for changes and signals. | |
go watch(d) | |
// Add the handler for definition requests and then start the | |
// server. | |
http.Handle("/define/", http.HandlerFunc(handler)) | |
log.Println(http.ListenAndServe(addr, nil)) | |
} | |
// watch updates the peer list of servers based on changes to the | |
// doozer configuration or signals from the OS. | |
func watch(d *doozer.Conn) { | |
peerFile := "/peers" | |
var peers []string | |
var rev int64 | |
// Run the initial get. | |
data, rev, err := d.Get(peerFile, nil) | |
if err != nil { | |
log.Println("initial peer list get:", err) | |
log.Println("using empty set to start") | |
peers = []string{} | |
} else { | |
peers = strings.Split(string(data), " ") | |
} | |
// Add myself to the list. | |
peers = append(peers, "http://"+addr) | |
rev, err = d.Set(peerFile, rev, | |
[]byte(strings.Join(peers, " "))) | |
if err != nil { | |
log.Println("unable to add myself to the peer list (no longer watching).") | |
return | |
} | |
pool.Set(peers...) | |
log.Println("added myself to the peer list.") | |
// Setup signal handling to deal with ^C and others. | |
sigs := make(chan os.Signal, 1) | |
signal.Notify(sigs, os.Interrupt, os.Kill) | |
// Get the channel that's listening for changes. | |
updates := wait(d, peerFile, &rev) | |
for { | |
select { | |
case <-sigs: | |
// Remove ourselves from the peer list and exit. | |
for i, peer := range peers { | |
if peer == "http://"+addr { | |
peers = append(peers[:i], peers[i+1:]...) | |
d.Set(peerFile, rev, []byte(strings.Join(peers, " "))) | |
log.Println("removed myself from peer list before exiting.") | |
} | |
} | |
os.Exit(1) | |
case update, ok := <-updates: | |
// If the channel was closed, we should stop selecting on it. | |
if !ok { | |
updates = nil | |
continue | |
} | |
// Otherwise, update the peer list. | |
peers = update | |
log.Println("got new peer list:", strings.Join(peers, " ")) | |
pool.Set(peers...) | |
} | |
} | |
} | |
// wait waits on a changes for the fiven file starting at the given | |
// revision from the given doozer connection. It sends updated peer | |
// lists on the returned channel. | |
func wait(d *doozer.Conn, file string, rev *int64) chan []string { | |
c := make(chan []string, 1) | |
cur := *rev | |
go func() { | |
for { | |
// Wait for the change. | |
e, err := d.Wait(file, cur+1) | |
if err != nil { | |
log.Println("waiting failed (no longer watching):", err) | |
close(c) | |
return | |
} | |
// Update the revision and send the change on the channel. | |
atomic.CompareAndSwapInt64(rev, cur, e.Rev) | |
cur = e.Rev | |
c <- strings.Split(string(e.Body), " ") | |
} | |
}() | |
return c | |
} | |
// handler handles all incoming requests for a definition. | |
func handler(w http.ResponseWriter, r *http.Request) { | |
log.Println("received request:", r.Method, r.URL.Path) | |
word := strings.Trim(path.Base(r.URL.Path), "/") | |
// Get the definition from groupcache and write it out. | |
var data []byte | |
err := dict.Get(nil, word, groupcache.AllocatingByteSliceSink(&data)) | |
if err != nil { | |
log.Println("retreiving definition for", word, "-", err) | |
w.WriteHeader(http.StatusInternalServerError) | |
return | |
} | |
io.Copy(w, bytes.NewReader(data)) | |
} | |
// query is a helper function for the groupcache that queries a remote | |
// dict server for the first definition of the given word. | |
func query(word string) (string, error) { | |
// NOTE: I am aware this is brittle and doesn't really follow the | |
// protocol all that well. | |
conn, err := net.Dial("tcp", dictAddr) | |
if err != nil { | |
return "", fmt.Errorf("connecting to dict: %v", err) | |
} | |
defer conn.Close() | |
// Send the DEFINE request and read the response into a buffer. | |
fmt.Fprintf(conn, "DEFINE %s %s\r\n", dictName, word) | |
scanner := bufio.NewScanner(conn) | |
var response bytes.Buffer | |
for scanner.Scan() { | |
// Read the line, trim any excess new lines | |
line := scanner.Text() | |
line = strings.Trim(line, "\r\n") | |
if strings.HasPrefix(line, "2") || strings.HasPrefix(line, "1") { | |
// Skip over any control data. | |
continue | |
} | |
if line == "." || line == "" { | |
// Quit when we reach the end of the first definition. | |
break | |
} | |
// Store the line we just read. | |
response.WriteString(line) | |
response.WriteString("\n") | |
} | |
// Check for errors after the scan. | |
if err := scanner.Err(); err != nil { | |
return "", fmt.Errorf("reading line from connection: %v", err) | |
} | |
// Send the QUIT message and return the definition. | |
fmt.Fprintf(conn, "QUIT\r\n") | |
return response.String(), nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment