Created
July 29, 2019 22:34
-
-
Save JnBrymn/46eea22ac1d16a0e23e57071372cabe2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"net/http" | |
"os" | |
"strconv" | |
"time" | |
) | |
// Library code //////////////////////////////////////////////////////////////////////////////////////////////////////// | |
type Report interface{} | |
type WorkUnit interface { | |
Run() Report | |
} | |
type ReportAggregator interface { | |
AddReport(Report) | |
PeriodicReport() | |
FinalReport() | |
} | |
const WorkGrantToken = true // just a placeholder | |
type WorkRunner struct { | |
workUnitChan chan WorkUnit | |
workGrantsChan chan interface{} | |
reportAggregator ReportAggregator | |
maxConcurrentWorkUnits uint | |
reportingPeriod time.Duration | |
runTime time.Duration | |
} | |
// Creates a WorkRunner which runs WorkUnits from workUnitChan as fast as it can without exceeding | |
// maxConcurrentWorkUnits. Once each WorkUnit is done, it submits a Report to the provided reportAggregator. | |
// After the specified reportingPeriod, new jobs are not issued and the Run immediately returns. | |
func NewWorkRunner( | |
workUnitChan chan WorkUnit, | |
reportAggregator ReportAggregator, | |
maxConcurrentWorkUnits uint, | |
reportingPeriod time.Duration, | |
runTime time.Duration, | |
) *WorkRunner { | |
wr := new(WorkRunner) | |
wr.workUnitChan = workUnitChan | |
wr.reportAggregator = reportAggregator | |
wr.maxConcurrentWorkUnits = maxConcurrentWorkUnits | |
wr.reportingPeriod = reportingPeriod | |
wr.runTime = runTime | |
return wr | |
} | |
func (wr *WorkRunner) Run() { | |
// Create channel used to limit concurrent work. You can only run a WorkUnit if there are work grants available. | |
// There are only maxConcurrentWorkUnits work grants. Once work is finished, the work grant is returned to be used | |
// again. | |
wr.workGrantsChan = make(chan interface{}, wr.maxConcurrentWorkUnits) | |
for i := uint(0); i < wr.maxConcurrentWorkUnits; i++ { | |
wr.workGrantsChan <- WorkGrantToken | |
} | |
// Set up reporting tics | |
reportingPeriodTick := time.Tick(wr.reportingPeriod) | |
runFinalizationAlert := time.After(wr.runTime) | |
for { | |
select { | |
case <-wr.workGrantsChan: | |
go wr.doSingleUnitOfWork() | |
case <-reportingPeriodTick: | |
fmt.Println("Concurrent work units:", cap(wr.workGrantsChan) - len(wr.workGrantsChan)) | |
go wr.reportAggregator.PeriodicReport() | |
case <-runFinalizationAlert: | |
wr.reportAggregator.FinalReport() | |
// TODO make it so that optionally we can wait till all outstanding work units are done | |
// OR use a channel to `close` to indicate that the job is done | |
return | |
} | |
} | |
} | |
func (wr *WorkRunner) doSingleUnitOfWork() { | |
workUnit := <-wr.workUnitChan | |
report := workUnit.Run() | |
wr.reportAggregator.AddReport(report) | |
wr.workGrantsChan <- WorkGrantToken | |
} | |
// Application code///////////////////////////////////////////////////////////////////////////////////////////////////// | |
type myReport struct{ | |
success bool | |
latency time.Duration | |
} | |
type myWorkUnit struct{ | |
mode string | |
} | |
func (wu *myWorkUnit) Run() Report { | |
startTs := time.Now() | |
switch wu.mode { | |
case SLEEP: | |
sleepRandTime() | |
case HTTP_GET: | |
getNonExistentWebsite() | |
case READ_FILE: | |
readRandBytes() | |
} | |
return &myReport{ | |
success: rand.Float64() > 0.10, | |
latency: time.Now().Sub(startTs), | |
} | |
} | |
func sleepRandTime() { | |
sleepTime := time.Duration(rand.Float64()*float64(time.Second)) | |
time.Sleep(sleepTime) | |
} | |
func readRandBytes() { | |
f, _ := os.Open("/dev/random") | |
b1 := make([]byte, 10) | |
for i := 0; i < 10; i++ { | |
f.Read(b1) | |
} | |
f.Close() | |
} | |
func getNonExistentWebsite() { | |
url := "http://" + strconv.Itoa(int(rand.Int31())) + ".com/" | |
resp, err := http.Get(url) | |
fmt.Println(resp, err) | |
} | |
type myReportAggregator struct { | |
numCompletedWorkUnits int | |
numSuccesses int | |
totalWait time.Duration | |
startTs time.Time | |
} | |
func NewMyReportAggregator() ReportAggregator { | |
ra := &myReportAggregator{} | |
ra.startTs = time.Now() | |
return ra | |
} | |
func (ra *myReportAggregator) AddReport(report Report) { | |
myReport := report.(*myReport) | |
// TODO add sync.Lock | |
// OR ACTUALLY, there's a way to use channels in WorkRunner such that ReportAggregator keeps the current simple | |
// interface, but you man make sure that the info isn't updated concurrently and we don't have to make it blocking | |
// (e.g. no sync.Lock) | |
ra.numCompletedWorkUnits++ | |
if myReport.success { | |
ra.numSuccesses++ | |
} | |
ra.totalWait += myReport.latency | |
} | |
func (ra *myReportAggregator) PeriodicReport() { | |
//TODO add sync.Lock | |
fmt.Printf( | |
"average latency: %v\nfail rate: %v\n\n", | |
time.Duration(float64(ra.totalWait)/float64(ra.numCompletedWorkUnits)), | |
1 - float64(ra.numSuccesses)/float64(ra.numCompletedWorkUnits), | |
) | |
} | |
func (ra *myReportAggregator) FinalReport() { | |
duration := time.Now().Sub(ra.startTs) | |
fmt.Println("total completed: ", ra.numCompletedWorkUnits) | |
fmt.Println("total time: ", duration) | |
fmt.Println("units per second: ", ra.numCompletedWorkUnits / int(duration.Seconds())) | |
} | |
func getWorkChan(mode string) chan WorkUnit { | |
workChan := make(chan WorkUnit) | |
go func() { | |
for { | |
workChan <-&myWorkUnit{mode} | |
} | |
}() | |
return workChan | |
} | |
const SLEEP = "sleep" | |
const HTTP_GET = "httpGet" | |
const READ_FILE = "readFile" | |
func main() { | |
rand.Seed(int64(time.Now().Unix())) | |
workChan := getWorkChan(HTTP_GET) | |
maxConcurrentWorkUnits := uint(250000) | |
//maxConcurrentWorkUnits := uint(10) | |
reportingPeriod := time.Second | |
runTime := time.Second * 15 | |
workRunner := NewWorkRunner( | |
workChan, | |
NewMyReportAggregator(), | |
maxConcurrentWorkUnits, | |
reportingPeriod, | |
runTime, | |
) | |
workRunner.Run() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment