Created
April 8, 2013 06:34
-
-
Save olegfedoseev/5334690 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
"runtime" | |
"compress/zlib" | |
"io" | |
"os" | |
"os/signal" | |
"bytes" | |
"sort" | |
"encoding/json" | |
zmq "github.com/alecthomas/gozmq" | |
) | |
// Data straight from pinba2zmq | |
type RawRequest struct { | |
hostname string | |
server string | |
script string | |
time float64 | |
cpu_time float64 | |
timers []Timer | |
} | |
type Message struct { | |
ts int | |
requests []RawRequest | |
} | |
type Timer struct{ | |
tags map[string]string | |
hits float64 | |
time float64 | |
} | |
type GroupedData struct { | |
tags map[string]string | |
count int | |
time Time | |
cpu_time Time | |
} | |
type Time struct { | |
med float64 | |
max float64 | |
avg float64 | |
p85 float64 | |
dev float64 | |
} | |
func parse_json(raw interface{}) (requests []RawRequest) { | |
for _, r := range raw.([]interface{}) { | |
m := r.(map[string]interface{}) | |
request := RawRequest{ | |
hostname: m["hostname"].(string), server: m["server"].(string), script: m["script"].(string), | |
time: m["time"].(float64), cpu_time: m["ru_utime"].(float64) + m["ru_stime"].(float64), | |
} | |
for _, val := range m["timers"].([]interface{}) { | |
t := val.(map[string]interface{}) | |
timer := Timer{ | |
hits: t["hits"].(float64), time: t["time"].(float64), tags: map[string]string{}, | |
} | |
for tag_name, tag_value := range t["tags"].(map[string]interface{}) { | |
timer.tags[tag_name] = tag_value.(string) | |
} | |
request.timers = append(request.timers, timer) | |
} | |
requests = append(requests, request) | |
} | |
return | |
} | |
func group_requests(requests *[]RawRequest) (result []GroupedData) { | |
tags := make(map[string]map[string]string) | |
time := make(map[string][]float64) | |
cpu_time := make(map[string][]float64) | |
for _, r := range *requests { | |
uniqid := r.hostname + r.server + r.script | |
tags[uniqid] = map[string]string{"host": r.hostname, "server": r.server, "script": r.script} | |
time[uniqid] = append(time[uniqid], r.time) | |
cpu_time[uniqid] = append(time[uniqid], r.cpu_time) | |
} | |
for uniqid, vals := range tags { | |
gd := GroupedData{ | |
tags: vals, | |
count: len(time[uniqid]), | |
time: aggregate(time[uniqid]), | |
cpu_time: aggregate(cpu_time[uniqid]), | |
} | |
result = append(result, gd) | |
} | |
return | |
} | |
func group_timers(requests *[]RawRequest) (result []GroupedData) { | |
tags := make(map[string]map[string]string) | |
time := make(map[string][]float64) | |
hits := make(map[string][]float64) | |
for _, r := range *requests { | |
for _, t := range r.timers { | |
uniqid := r.hostname + r.server + r.script | |
keys := make([]string, len(t.tags)) | |
i := 0 | |
for k, _ := range t.tags { | |
keys[i] = k | |
i++ | |
} | |
sort.Strings(keys) | |
for _, key := range keys { | |
uniqid += key + t.tags[key] | |
} | |
tags[uniqid] = map[string]string{"host": r.hostname, "server": r.server, "script": r.script} | |
for _, key := range keys { | |
tags[uniqid][key] = t.tags[key] | |
} | |
time[uniqid] = append(time[uniqid], t.time) | |
hits[uniqid] = append(hits[uniqid], float64(t.hits)) | |
} | |
} | |
for uniqid, vals := range tags { | |
gd := GroupedData{ | |
tags: vals, | |
count: int(sum(hits[uniqid])), | |
time: aggregate(time[uniqid]), | |
} | |
result = append(result, gd) | |
} | |
return | |
} | |
func aggregate(values []float64) Time { | |
if len(values) == 0 { | |
return Time{} | |
} | |
sort.Float64s(values) | |
idx := int(0.15 * float64(len(values))) | |
return Time{ | |
med: values[len(values) / 2], | |
max: values[len(values) - 1], | |
avg: sum(values) / float64(len(values)), | |
p85: sum(values[idx:]) / float64(len(values[idx:])), | |
} | |
} | |
func sum(values []float64) (sum float64) { | |
for _, value := range values { | |
sum += value | |
} | |
return | |
} | |
func listenForSignals(exit_channel chan bool) { | |
signal_channel := make(chan os.Signal) | |
signal.Notify(signal_channel) | |
<- signal_channel | |
fmt.Println("stopping") | |
exit_channel <- true | |
} | |
func read_raw(content *[]byte) (int, []RawRequest){ | |
var out bytes.Buffer | |
r, _ := zlib.NewReader(io.Reader(bytes.NewBuffer(*content))) | |
io.Copy(&out, r) | |
r.Close() | |
var in [2]interface{} | |
json.Unmarshal(out.Bytes(), &in) | |
// Все запросы за секунду, без группировок | |
return int(in[0].(float64)), parse_json(in[1]) | |
} | |
func collector(interval int, start_time time.Time, in <-chan Message) { | |
count := 0 | |
buffer := []RawRequest{} | |
//start_time.Seconds() | |
for { | |
message := <- in | |
buffer = append(buffer, message.requests...) | |
count += 1 | |
if count == interval { | |
go func (buffer []RawRequest) { | |
t := time.Now() | |
// Сгруппированные запросы за секунду | |
requests := group_requests(&buffer) | |
// Сгруппированные таймеры за секунду | |
timers := group_timers(&buffer) | |
//fmt.Printf("%v\n", len(content)) | |
fmt.Printf("PROCESSED [%v] RawRequests: %v (requests: %v, timers: %v) in %v\n", | |
message.ts, len(buffer), len(requests), len(timers), time.Now().Sub(t)) | |
}(buffer) | |
buffer = []RawRequest{} | |
count = 0 | |
} | |
fmt.Printf("interval: %v, ts: %v, len(requests): %v, len(buffer): %v\n", interval, message.ts, len(message.requests), len(buffer)) | |
} | |
} | |
func main() { | |
fmt.Println("Boo!") | |
runtime.GOMAXPROCS(runtime.NumCPU()) | |
exit := make(chan bool) | |
exit_signal := false | |
go listenForSignals(exit) | |
context, _ := zmq.NewContext() | |
defer context.Close() | |
subscriber, _ := context.NewSocket(zmq.SUB) | |
defer subscriber.Close() | |
subscriber.Connect("tcp://172.16.5.130:5000") | |
subscriber.SetSockOptString(zmq.SUBSCRIBE, "") | |
data := make(chan []byte) | |
errors := make(chan error) | |
go func() { | |
for { | |
// Получение исходных данных от pinba2zmq | |
msgbytes, err := subscriber.Recv(0) | |
if err != nil { | |
errors <- err | |
} else { | |
data <- msgbytes | |
} | |
} | |
}() | |
// Хочу получать данные сгрупированные по секундно, по 10 секунд и по минутно | |
intervals := [3]int{1, 10, 60} | |
var channels map[int] chan Message = make(map[int] chan Message) | |
for _, interval := range intervals { | |
channels[interval] = make(chan Message, 1) | |
go collector(interval, time.Now(), channels[interval]) | |
} | |
for exit_signal == false { | |
select { | |
case exit_signal = <- exit: | |
fmt.Println("W: interrupt received, killing server...") | |
case err := <- errors: | |
fmt.Println("Receive Error:", err.Error()) | |
case msgbytes := <- data: | |
t := time.Now() | |
ts, raw_requests := read_raw(&msgbytes) | |
m := Message{ts: ts, requests: raw_requests} | |
for _, interval := range intervals { | |
channels[interval] <- m | |
} | |
fmt.Printf("RECV [%v] RawRequests: %v in %v\n", ts, len(raw_requests), time.Now().Sub(t)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment