Last active
January 6, 2017 19:23
-
-
Save macalinao/bc08b9a6ec31cb496feaf058dccd6fff to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package processor | |
import ( | |
"sync" | |
"time" | |
"github.com/Sirupsen/logrus" | |
) | |
// Metrics records processed summoners and matches and logs progress. | |
type Metrics struct { | |
Logger *logrus.Logger `inject:"t"` | |
reqCt map[string]int | |
reqs chan string | |
reqCtMu sync.Mutex | |
} | |
// Start starts the metrics. | |
func (m *Metrics) Start() { | |
m.reqCt = map[string]int{} | |
m.reqs = make(chan string) | |
// Show rate | |
go func() { | |
for range time.Tick(5 * time.Second) { | |
m.Logger.Infof("===") | |
m.reqCtMu.Lock() | |
total := 0 | |
for reqType, ct := range m.reqCt { | |
m.Logger.Infof("- %s: %d (%.2f/sec)", reqType, ct, float64(ct)/5.0) | |
total += ct | |
} | |
m.Logger.Infof("TOTAL: %d (%.2f/sec)", total, float64(total)/5.0) | |
m.reqCt = map[string]int{} | |
m.reqCtMu.Unlock() | |
} | |
}() | |
// Process channels | |
for endpoint := range m.reqs { | |
m.reqCtMu.Lock() | |
m.reqCt[endpoint] += 1 | |
m.reqCtMu.Unlock() | |
} | |
} | |
// RecordSummoner records a summoner. | |
func (m *Metrics) Record(endpoint string) { | |
m.reqs <- endpoint | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Metrics stream | |
val metrics = PublishToOneSubject[String]() | |
// Metrics reporter | |
metrics.bufferTimed(10.seconds).foreach { vals => | |
println("=== METRICS 10S ==") | |
vals.groupBy(identity).mapValues(_.length).foreach { case (str, ct) => | |
println(s"${str}: ${ct} vals") | |
} | |
println("=== END METRICS ===") | |
} | |
def measuredEndpoint[T](fut: Future[T])(implicit tag: ClassTag[T]): Future[T] = { | |
val name = tag.runtimeClass.getName | |
for { | |
_ <- metrics.onNext(s"start: $name") | |
res <- endpoint(fut) | |
_ <- metrics.onNext(s"end: $name") | |
} yield res | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment