Last active
August 2, 2023 06:30
-
-
Save jerryan999/ef0aa671b0f15b73c8dbd225f411a963 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
"sync" | |
"time" | |
"go.uber.org/ratelimit" | |
) | |
func Get(url string) ([]byte, int, error) { | |
resp, err := http.Get(url) | |
if err != nil { | |
return nil, resp.StatusCode, err | |
} | |
defer resp.Body.Close() | |
body, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
return nil, resp.StatusCode, err | |
} | |
return body, resp.StatusCode, nil | |
} | |
type Task struct { | |
seq int | |
url string | |
data []byte | |
err string | |
statusCode int | |
duration time.Duration | |
handleBy string | |
} | |
func (t Task) String() string { | |
return fmt.Sprintf("seq: %d, url: %s, handleBy: %s ,duration: %d, statusCode: %d, err: %s ...", t.seq, t.url, t.handleBy, t.duration, t.statusCode, t.err) | |
} | |
// worker is used to handle the task from the taskChan, and send the result to the resultsChan. | |
func worker(name string, taskChan <-chan Task, resultChan chan<- Task) { | |
for task := range taskChan { | |
start := time.Now() | |
body, code, err := Get(task.url) | |
if err != nil { | |
task.err = err.Error() | |
} | |
task.statusCode = code | |
task.data = body | |
task.handleBy = name | |
task.duration = time.Duration(time.Since(start).Milliseconds()) | |
resultChan <- task | |
} | |
} | |
func producer(rl ratelimit.Limiter) <-chan Task { | |
var tasks []Task | |
for i := 0; i < 1000; i++ { | |
url := "https://httpbin.org/get?i=" + fmt.Sprintf("%d", i) | |
tasks = append(tasks, Task{seq: i, url: url}) | |
} | |
fmt.Println("total urls: ", len(tasks)) | |
out := make(chan Task) | |
go func() { | |
defer close(out) | |
for _, task := range tasks { | |
rl.Take() | |
out <- task | |
} | |
}() | |
return out | |
} | |
func main() { | |
rl := ratelimit.New(500, ratelimit.Per(60*time.Second)) // per second | |
taskChan := producer(rl) | |
resultsChan := make(chan Task) | |
// start 5 workers | |
numWorker := 5 | |
var wg sync.WaitGroup | |
for i := 0; i < numWorker; i++ { | |
wg.Add(1) | |
go func(x int) { | |
defer wg.Done() | |
worker(fmt.Sprintf("worker-%d", x), taskChan, resultsChan) | |
}(i) | |
} | |
// a must have goroutine to close the resultChan | |
go func() { | |
wg.Wait() | |
close(resultsChan) | |
}() | |
for result := range resultsChan { | |
fmt.Println(result) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment