Skip to content

Instantly share code, notes, and snippets.

@shuhei
Last active August 19, 2017 09:14
Show Gist options
  • Save shuhei/bf55d6510003c4649b08060e9f2c99f5 to your computer and use it in GitHub Desktop.
Save shuhei/bf55d6510003c4649b08060e9f2c99f5 to your computer and use it in GitHub Desktop.
Process only one job for a same ID at a moment
package main
import (
"fmt"
"time"
)
func main() {
input := make(chan int)
go startMaster(input)
input <- 1
input <- 1
input <- 1
input <- 2
input <- 2
input <- 3
input <- 1
input <- 2
for {
}
}
// master
func startMaster(input chan int) {
// If workers need data other than ID, this could be a map of ID and queue.
jobs := make(map[int]int)
done := make(chan int)
var id int
for {
select {
case id = <-input:
if jobs[id] == 0 {
go startWorker(id, done)
}
jobs[id] += 1
case id = <-done:
jobs[id] -= 1
if jobs[id] > 0 {
go startWorker(id, done)
}
}
}
}
// worker
func startWorker(id int, done chan int) {
fmt.Println("start", id)
time.Sleep(100 * time.Millisecond)
fmt.Println("end", id)
done <- id
}
package main
import (
"fmt"
"time"
)
type Job struct {
groupId int
message string
}
func main() {
input := make(chan *Job)
go startMaster(input)
input <- &Job{groupId: 1, message: "nihao"}
input <- &Job{groupId: 1, message: "ciao"}
input <- &Job{groupId: 1, message: "hello"}
input <- &Job{groupId: 2, message: "hallo"}
input <- &Job{groupId: 2, message: "konnnichiwa"}
input <- &Job{groupId: 3, message: "bonjour"}
input <- &Job{groupId: 1, message: "hola"}
input <- &Job{groupId: 2, message: "namaste"}
for {
}
}
// master
func startMaster(input chan *Job) {
jobs := make(map[int][]*Job)
done := make(chan int)
for {
select {
case job := <-input:
if jobs[job.groupId] == nil {
// TODO: Is this the best way for making a queue?
jobs[job.groupId] = make([]*Job, 0, 10)
}
if len(jobs[job.groupId]) == 0 {
go startWorker(job, done)
}
jobs[job.groupId] = append(jobs[job.groupId], job)
case groupId := <-done:
// TODO: Doesn't this leak the first element?
jobs[groupId] = jobs[groupId][1:]
if len(jobs[groupId]) > 0 {
go startWorker(jobs[groupId][0], done)
}
}
}
}
// worker
func startWorker(job *Job, done chan int) {
defer notifyDone(job.groupId, done)
fmt.Println("start", job.groupId, job.message)
time.Sleep(100 * time.Millisecond)
fmt.Println("end", job.groupId, job.message)
}
// TODO: Do we really need a function to do `defer done <- job.groupId`?
func notifyDone(groupId int, done chan int) {
done <- groupId
}
@kenzan100
Copy link

kenzan100 commented Aug 19, 2017

いいですね!

昨晩やってみたのは

type lock struct {
	mu *sync.Mutex
	customerLocks map[int]*sync.Mutex
}

func processXXX(id) {
	lock.mu.Lock()
	if lock.customerLocks[id] == nil {
		lock.customerLocks[id] = &sync.Mutex{}
	}
	lock.mu.Unlock()
	lock.customerLocks[id].Lock()
	defer lock.customerLocks[id].Unlock()
        // do exclusive logic per id
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment