Last active
December 30, 2015 06:29
-
-
Save jjmalina/7789435 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// insertion or deletion of arbitrary json logs to riak | |
// usage: | |
// cat logs | write-or-delete-logs --hosts=127.0.0.1,127.0.0.2 --port=8087 mybucket 2>> write_log & | |
// OR you can delete EVERYTHING | |
// write-or-delete-logs --delete --hosts=127.0.0.1,127.0.0.2 --port=8087 mybucket | |
package main | |
import ( | |
"bufio" | |
"flag" | |
"github.com/mrb/riakpbc" | |
"io" | |
"log" | |
"os" | |
"strconv" | |
"strings" | |
) | |
type Log struct { | |
Id string | |
Payload string | |
} | |
func putToRiak(riak *riakpbc.Client, bucket string, data string) (string, error) { | |
var log Log | |
if jsonerr := json.Unmarshal([]byte(data), &log); jsonerr != nil { | |
return "", jsonerr | |
} | |
response, riakErr := riak.ReqResp(&riakpbc.RpbPutReq{ | |
Bucket: []byte(bucket), | |
Content: &riakpbc.RpbContent{ | |
Value: []byte(data), | |
ContentType: []byte("application/json"), | |
Indexes: []*riakpbc.RpbPair{ | |
&riakpbc.RpbPair{ | |
Key: []byte("payload_bin"), | |
Value: []byte(log.Payload), | |
}, | |
}, | |
}, | |
}, "RpbPutReq", false) | |
if riakErr != nil { | |
return "", riakErr | |
} else { | |
putresp := response.(*riakpbc.RpbPutResp) | |
return string(putresp.Key), nil | |
} | |
} | |
func deleteKey(riak *riakpbc.Client, bucket string, key string) (string, error) { | |
if _, err := riak.DeleteObject(bucket, key); err != nil { | |
return key, err | |
} | |
return key, nil | |
} | |
type RiakWorker struct { | |
Id int | |
Client *riakpbc.Client | |
bucket string | |
Results chan *Work | |
workFunc func(*riakpbc.Client, string, string) (string, error) | |
} | |
func NewRiakWorker(id int, nodes []string, bucket string, | |
workFunc func(*riakpbc.Client, string, string) (string, error)) *RiakWorker { | |
client := riakpbc.NewClient(nodes) | |
if err := client.Dial(); err != nil { | |
panic(err.Error()) | |
} | |
return &RiakWorker{ | |
Id: id, | |
Client: client, | |
bucket: bucket, | |
Results: make(chan *Work), | |
workFunc: workFunc, | |
} | |
} | |
type Work struct { | |
Data string | |
Key string | |
Success bool | |
LineNumber int | |
} | |
func (w *RiakWorker) Run(jobs <-chan *Work) { | |
for work := range jobs { | |
w.process(work) | |
} | |
close(w.Results) | |
log.Println("Worker " + strconv.Itoa(w.Id) + " exiting.") | |
} | |
func (w *RiakWorker) process(job *Work) { | |
key, error := w.workFunc(w.Client, w.bucket, job.Data) | |
if error != nil { | |
job.Success = false | |
} else { | |
job.Success = true | |
job.Key = key | |
} | |
w.Results <- job | |
} | |
func stdinToChannel(input chan<- *Work) { | |
bio := bufio.NewReader(os.Stdin) | |
i := 0 | |
for { | |
line, err := bio.ReadString('\n') | |
if err != nil { | |
if err == io.EOF { | |
break | |
} | |
log.Println(err) | |
} | |
i = i + 1 | |
input <- &Work{Data: line, LineNumber: i} | |
} | |
close(input) | |
} | |
func listKeysIntoChannel(client *riakpbc.Client, bucket string) func(input chan<- *Work) { | |
return func(input chan<- *Work) { | |
keys, riakErr := client.ListKeys(bucket) | |
if riakErr != nil { | |
log.Println(riakErr.Error()) | |
} else { | |
for i := range keys { | |
input <- &Work{Data: string(keys[i]), LineNumber: i} | |
} | |
} | |
close(input) | |
} | |
} | |
func listenOnResultsChannels(delete bool, worker *RiakWorker, done chan *RiakWorker) { | |
for job := range worker.Results { | |
var msg string | |
if job.Success { | |
if delete { | |
msg = "DELETED " + job.Key | |
} else { | |
msg = "ADDED " + job.Key | |
} | |
msg = msg + " from line " + strconv.Itoa(job.LineNumber) | |
} else { | |
msg = "ERROR from line " + strconv.Itoa(job.LineNumber) | |
} | |
log.Println(msg) | |
} | |
done <- worker | |
} | |
func main() { | |
var hostlist = flag.String("hosts", "127.0.0.1", "Comma delimited list of hosts") | |
var port = flag.Int("port", 8087, "Riak Protobuf Port") | |
var nworkers = flag.Int("workers", 3, "Number of workers") | |
var deleteMode = flag.Bool("delete", false, "Delete all keys in the bucket (useful for cleaning up)") | |
flag.Parse() | |
args := flag.Args() | |
if len(args) < 1 { | |
log.Print("Bucket missing") | |
os.Exit(1) | |
} | |
bucket := args[0] | |
hosts := strings.Split(*hostlist, ",") | |
var workFunc func(*riakpbc.Client, string, string) (string, error) | |
var inputFunc func(chan<- *Work) | |
var nodes = make([]string, len(*hostlist)) | |
for i, host := range hosts { | |
nodes[i] = host + ":" + strconv.Itoa(*port) | |
} | |
if *deleteMode { | |
client := riakpbc.NewClient(nodes) | |
if err := client.Dial(); err != nil { | |
panic(err.Error()) | |
} | |
workFunc = deleteKey | |
inputFunc = listKeysIntoChannel(client, bucket) | |
} else { | |
workFunc = putToRiak | |
inputFunc = stdinToChannel | |
} | |
jobs := make(chan *Work) | |
workers := make([]*RiakWorker, *nworkers) | |
var done = make(chan *RiakWorker) | |
// spawn workers | |
for i := range workers { | |
workers[i] = NewRiakWorker(i, nodes, bucket, workFunc) | |
go workers[i].Run(jobs) | |
go listenOnResultsChannels(*deleteMode, workers[i], done) | |
} | |
// start putting work into workers | |
inputFunc(jobs) | |
for i := 0; i < *nworkers; i++ { | |
<-done | |
} | |
log.Println("Exiting") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment