Last active
February 8, 2023 14:21
-
-
Save tanishiking/be9fac4aa02419d68c6770a85e53c936 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"sync" | |
) | |
// Dispatcher represents a job dispatcher. | |
type Dispatcher struct { | |
sem chan struct{} // semaphore | |
jobBuffer chan *Job | |
worker Worker | |
wg sync.WaitGroup | |
} | |
// NewDispatcher will create a new instance of job dispatcher. | |
// maxWorkers means the maximum number of goroutines that can work concurrently. | |
// buffers means the maximum size of the queue. | |
func NewDispatcher(worker Worker, maxWorkers int, buffers int) *Dispatcher { | |
return &Dispatcher{ | |
// Restrict the number of goroutine using buffered channel (as counting semaphor) | |
sem: make(chan struct{}, maxWorkers), | |
jobBuffer: make(chan *Job, buffers), | |
worker: worker, | |
} | |
} | |
// Start starts a dispatcher. | |
// This dispatcher will stops when it receive a value from `ctx.Done`. | |
func (d *Dispatcher) Start(ctx context.Context) { | |
d.wg.Add(1) | |
go d.loop(ctx) | |
} | |
// Wait blocks until the dispatcher stops. | |
func (d *Dispatcher) Wait() { | |
d.wg.Wait() | |
} | |
// Add enqueues a job into the queue. | |
// If the number of enqueued jobs has already reached to the maximum size, | |
// this will block until the other job has finish and the queue has space to accept a new job. | |
func (d *Dispatcher) Add(job *Job) { | |
d.jobBuffer <- job | |
} | |
func (d *Dispatcher) stop() { | |
d.wg.Done() | |
} | |
func (d *Dispatcher) loop(ctx context.Context) { | |
var wg sync.WaitGroup | |
Loop: | |
for { | |
select { | |
case <-ctx.Done(): | |
// block until all the jobs finishes | |
wg.Wait() | |
break Loop | |
case job := <-d.jobBuffer: | |
// Increment the waitgroup | |
wg.Add(1) | |
// Decrement a semaphore count | |
d.sem <- struct{}{} | |
go func(job *Job) { | |
defer wg.Done() | |
// After the job finished, increment a semaphore count | |
defer func() { <-d.sem }() | |
d.worker.Work(job) | |
}(job) | |
} | |
} | |
d.stop() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// Job represents a interface of job that can be enqueued into a dispatcher. | |
type Job struct { | |
URL string | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"os" | |
"os/signal" | |
"syscall" | |
) | |
func main() { | |
ctx, cancel := context.WithCancel(context.Background()) | |
sigCh := make(chan os.Signal, 1) | |
defer close(sigCh) | |
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT) | |
go func() { | |
// wait until receiving the signal | |
<-sigCh | |
cancel() | |
}() | |
p := NewPrinter() | |
d := NewDispatcher(p, 10, 1000) | |
d.Start(ctx) | |
for i := 0; i < 100; i++ { | |
url := fmt.Sprintf("http://example.com/%d", i) | |
job := &Job{URL: url} | |
d.Add(job) | |
} | |
d.Wait() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"time" | |
) | |
// Printer is a dummy worker that just prints received URL. | |
type Printer struct{} | |
func NewPrinter() *Printer { | |
return &Printer{} | |
} | |
// Work waits for a few seconds and print a received URL. | |
func (p *Printer) Work(j *Job) { | |
t := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second) | |
defer t.Stop() | |
<-t.C | |
fmt.Println(j.URL) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
type Worker interface { | |
Work(j *Job) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment