Created
November 11, 2013 17:18
-
-
Save jjmalina/7416807 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// insertion of logs into riak | |
// usage: | |
// cat logs | log-processing --host=127.0.0.1 --port=10017 mybucket | |
package main | |
import ( | |
"bufio" | |
"encoding/json" | |
"flag" | |
"github.com/mrb/riakpbc" | |
"io" | |
"log" | |
"os" | |
"strconv" | |
) | |
type LogData struct { | |
Timestamp float64 | |
Code string | |
Csid string | |
} | |
func putToRiak(riak *riakpbc.Client, bucket string, data string) (string, error) { | |
var event LogData | |
if jsonerr := json.Unmarshal([]byte(data), &event); jsonerr != nil { | |
return "", jsonerr | |
} | |
response, riakErr := riak.ReqResp(&riakpbc.RpbPutReq{ | |
Bucket: []byte(bucket), | |
Content: &riakpbc.RpbContent{ | |
Value: []byte(data), | |
ContentType: []byte("application/json"), | |
Indexes: []*riakpbc.RpbPair{ | |
&riakpbc.RpbPair{ | |
Key: []byte("timestamp_int"), | |
Value: []byte(strconv.FormatFloat(event.Timestamp, 'f', 0, 64)), | |
}, | |
&riakpbc.RpbPair{ | |
Key: []byte("code_bin"), | |
Value: []byte(event.Code), | |
}, | |
&riakpbc.RpbPair{ | |
Key: []byte("csid_bin"), | |
Value: []byte(event.Csid), | |
}, | |
}, | |
}, | |
}, "RpbPutReq", false) | |
if riakErr != nil { | |
return "", riakErr | |
} else { | |
putresp := response.(*riakpbc.RpbPutResp) | |
key := string(putresp.Key) | |
return key, nil | |
} | |
} | |
func deleteKey(riak *riakpbc.Client, bucket string, key string) (string, error) { | |
if _, err := riak.DeleteObject(bucket, key); err != nil { | |
return key, err | |
} | |
return key, nil | |
} | |
type RiakWorker struct { | |
Id int | |
Client *riakpbc.Client | |
bucket string | |
Results chan string | |
workFunc func(*riakpbc.Client, string, string) (string, error) | |
} | |
func NewRiakWorker(id int, host *string, port *int, bucket string, | |
workFunc func(*riakpbc.Client, string, string) (string, error)) *RiakWorker { | |
client := riakpbc.NewClient([]string{*host + ":" + strconv.Itoa(*port)}) | |
if err := client.Dial(); err != nil { | |
panic(err.Error()) | |
} | |
return &RiakWorker{ | |
Id: id, | |
Client: client, | |
bucket: bucket, | |
Results: make(chan string), | |
workFunc: workFunc, | |
} | |
} | |
type Work struct { | |
Data string | |
LineNumber int | |
} | |
func (w *RiakWorker) Run(jobs <-chan *Work) { | |
for work := range jobs { | |
w.process(work) | |
} | |
close(w.Results) | |
log.Println("Worker " + strconv.Itoa(w.Id) + " exiting.") | |
} | |
func (w *RiakWorker) process(job *Work) { | |
key, error := w.workFunc(w.Client, w.bucket, job.Data) | |
if error == nil { | |
w.Results <- key + " from line " + strconv.Itoa(job.LineNumber) | |
} else { | |
w.Results <- "error from line " + strconv.Itoa(job.LineNumber) | |
} | |
} | |
func stdinToChannel(input chan<- *Work) { | |
bio := bufio.NewReader(os.Stdin) | |
i := 0 | |
for { | |
line, err := bio.ReadString('\n') | |
if err != nil { | |
if err == io.EOF { | |
break | |
} | |
log.Println(err) | |
} | |
i = i + 1 | |
input <- &Work{Data: line, LineNumber: i} | |
} | |
close(input) | |
} | |
func listKeysIntoChannel(client *riakpbc.Client, bucket string) func(input chan<- *Work) { | |
return func(input chan<- *Work) { | |
keys, riakErr := client.ListKeys(bucket) | |
if riakErr != nil { | |
log.Println(riakErr.Error()) | |
} else { | |
for i := range keys { | |
input <- &Work{Data: string(keys[i]), LineNumber: i} | |
} | |
} | |
close(input) | |
} | |
} | |
func listenOnResultsChannels(delete bool, worker *RiakWorker, done chan *RiakWorker) { | |
for result := range worker.Results { | |
var msg string | |
if delete { | |
msg = "DELETED " + result | |
} else { | |
msg = "ADDED " + result | |
} | |
log.Println(msg) | |
} | |
done <- worker | |
} | |
func main() { | |
var host = flag.String("host", "127.0.0.1", "Riak Host") | |
var port = flag.Int("port", 10017, "Riak Protobuf Port") | |
var deleteMode = flag.Bool("deleteAll", false, "Delete all keys in the bucket (useful for cleaning up)") | |
flag.Parse() | |
args := flag.Args() | |
if len(args) < 1 { | |
log.Print("Bucket missing") | |
os.Exit(1) | |
} | |
bucket := args[0] | |
var workFunc func(*riakpbc.Client, string, string) (string, error) | |
var inputFunc func(chan<- *Work) | |
if *deleteMode { | |
client := riakpbc.NewClient([]string{*host + ":" + strconv.Itoa(*port)}) | |
if err := client.Dial(); err != nil { | |
panic(err.Error()) | |
} | |
workFunc = deleteKey | |
inputFunc = listKeysIntoChannel(client, bucket) | |
} else { | |
workFunc = putToRiak | |
inputFunc = stdinToChannel | |
} | |
jobs := make(chan *Work) | |
numWorkers := 3 | |
workers := make([]*RiakWorker, numWorkers) | |
// listen for results from workers | |
var done = make(chan *RiakWorker) | |
// spawn workers | |
for i := range workers { | |
workers[i] = NewRiakWorker(i, host, port, bucket, workFunc) | |
go workers[i].Run(jobs) | |
go listenOnResultsChannels(*deleteMode, workers[i], done) | |
} | |
// start putting work into workers | |
inputFunc(jobs) | |
for i:=0; i<numWorkers; i++ { | |
<-done | |
} | |
// done | |
log.Println("Exiting") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment