Skip to content

Instantly share code, notes, and snippets.

@slavone
Last active May 30, 2022 14:49
Show Gist options
  • Save slavone/a55eb39f36034af4d1170c2995eb3e27 to your computer and use it in GitHub Desktop.
Save slavone/a55eb39f36034af4d1170c2995eb3e27 to your computer and use it in GitHub Desktop.
package main
import (
"log"
"sync"
"time"
)
type Worker struct {
name string
wg *sync.WaitGroup
ch <-chan string
}
type Queue struct {
lock *sync.Mutex
subscribers map[string]chan string
}
func NewWorker(id string, ch <-chan string, wg *sync.WaitGroup) *Worker {
wg.Add(1)
return &Worker{
ch: ch,
wg: wg,
name: id,
}
}
func NewQueue() Queue {
return Queue{
subscribers: make(map[string]chan string),
lock: &sync.Mutex{},
}
}
func (w Worker) Run(pingEvery time.Duration) {
ticker := time.NewTicker(pingEvery)
for {
select {
case <-ticker.C:
log.Printf("Run %s ping", w.name)
case msg, open := <-w.ch:
if !open {
log.Printf("Run %s closed", w.name)
w.wg.Done()
return
}
log.Printf("Run %s received message %s", w.name, msg)
}
}
}
func (q Queue) Sub(workerID string) <-chan string {
q.lock.Lock()
defer q.lock.Unlock()
ch := make(chan string)
q.subscribers[workerID] = ch
return ch
}
func (q Queue) Unsub(workerID string) {
q.lock.Lock()
ch, ok := q.subscribers[workerID]
if !ok {
log.Panic("not found workerID")
}
delete(q.subscribers, workerID)
q.lock.Unlock()
close(ch)
}
func (q Queue) Pub(msg string) {
q.lock.Lock()
defer q.lock.Unlock()
for _, ch := range q.subscribers {
ch <- msg
}
}
func main() {
wg := &sync.WaitGroup{}
q := NewQueue()
ch1 := q.Sub("wg1")
ch2 := q.Sub("wg2")
w1 := NewWorker("wg1", ch1, wg)
w2 := NewWorker("wg2", ch2, wg)
go w1.Run(time.Second)
go w2.Run(time.Second * 2)
q.Pub("msg1")
q.Pub("msg2")
time.Sleep(time.Second * 3)
q.Unsub("wg1")
time.Sleep(time.Second)
q.Pub("msg3")
time.Sleep(time.Second * 3)
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment