Last active
July 11, 2018 01:31
-
-
Save kwiesmueller/59121f23c7968b5104a153d7beb61f03 to your computer and use it in GitHub Desktop.
Channel based fanOut implementation in Golang with dynamic, threadsafe subscriptions and unsubscriptions sending and reveiving from a potential number of n publishers and subscribers. This implementation relies solely onto channels and does not use mutexes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"math/rand" | |
"os" | |
"sync/atomic" | |
"time" | |
"go.uber.org/zap" | |
"github.com/seibert-media/golibs/log" | |
) | |
var logger *log.Logger | |
var publishCount int64 | |
var resultCount int64 | |
var realReceive int64 | |
const ( | |
// amount of subscribers | |
subscriberCount int = 20 | |
// amount of publishers | |
publisherCount int = 1 | |
// amount of events sent by each publisher | |
eventCount int = 10000 | |
// enable cancellation | |
enableCancel bool = true | |
// debug write to file | |
writeToFiles bool = false | |
) | |
var ( | |
// time until a publishers sends the next event | |
publishSleep time.Duration = (time.Duration(rand.Intn(5)) * time.Nanosecond) | |
// time until the dispatcher gives up sending to a subscription | |
subscriptionTimeout time.Duration = time.Second * 1 | |
// time until the dispatcher gives up finding a subscription | |
dispatchTimeout time.Duration = time.Millisecond * 1 | |
// time until the dispatcher gives up looking for new events | |
noEventTimeout time.Duration = time.Millisecond * 1 | |
// time until subscriptions get closed | |
earlyCloseDuration = func() time.Duration { return time.Duration(rand.Intn(5)) * time.Second } | |
// time until new subscriptions get added | |
lateAddDuration = func() time.Duration { return time.Duration(rand.Intn(5)) * time.Second } | |
) | |
func elapsed() func() { | |
start := time.Now() | |
return func() { | |
logger.Info("finished", | |
zap.Time("started", start), | |
zap.Duration("took", time.Since(start))) | |
} | |
} | |
func main() { | |
logger = log.New("", true) | |
resultCount = 0 | |
p := &Publisher{ | |
events: make(chan *Event), | |
done: make(chan error), | |
listeners: make(chan *subscription), | |
} | |
logger.Info("starting") | |
defer elapsed()() | |
go p.Dispatch() | |
for is := 1; is <= subscriberCount; is++ { | |
if is%2 == 0 && enableCancel { | |
go func(is int) { | |
<-time.After(lateAddDuration()) | |
p.buildSubscriber(fmt.Sprintf("%d", is)) | |
}(is) | |
} else { | |
cancel, s := p.buildSubscriber(fmt.Sprintf("%d", is)) | |
//p.buildSubscriber(fmt.Sprintf("%d", is)) | |
go func(s *subscription, cancel context.CancelFunc) { | |
<-time.After(earlyCloseDuration()) | |
if enableCancel { | |
logger.Info("canceling subscription", | |
zap.String("name", s.name), | |
zap.Int64("after", atomic.LoadInt64(&publishCount)), | |
zap.Int64("received", atomic.AddInt64(&realReceive, atomic.LoadInt64(&s.ec)))) | |
cancel() | |
} | |
}(s, cancel) | |
} | |
} | |
for ip := 0; ip < publisherCount; ip++ { | |
go func() { | |
i := 0 | |
for { | |
i++ | |
if i > eventCount { | |
return | |
} | |
atomic.AddInt64(&publishCount, 1) | |
p.Publish(&Event{id: int64(i), msg: fmt.Sprintf("%d", i), source: fmt.Sprintf("%d", ip)}) | |
time.Sleep(publishSleep) | |
} | |
logger.Info("finished publishing", | |
zap.Int64("count", atomic.LoadInt64(&publishCount))) | |
}() | |
} | |
<-p.done | |
} | |
func (p *Publisher) buildSubscriber(name string) (context.CancelFunc, *subscription) { | |
ctx, cancel := context.WithCancel(context.Background()) | |
var f *os.File | |
if writeToFiles { | |
f, _ = os.Create(fmt.Sprintf("subscribers/%s", name)) | |
} | |
s := p.Subscribe(ctx, name, make(chan *Event)) | |
go func(sub *subscription) { | |
for e := range sub.listener { | |
ec := atomic.AddInt64(&sub.ec, 1) | |
prv := atomic.AddInt64(&sub.previous, 1) | |
logger.Info("received event", | |
zap.String("event", e.msg), | |
zap.String("source", e.source), | |
zap.String("subscription", sub.name), | |
zap.Int64("ownCount", ec), | |
zap.Int64("globalCount", atomic.AddInt64(&resultCount, 1))) | |
if writeToFiles { | |
f.Write([]byte(fmt.Sprintln(e.msg))) | |
} | |
if e.id != prv { | |
logger.Warn("missing id", | |
zap.String("event", e.msg), | |
zap.String("source", e.source), | |
zap.String("subscription", sub.name), | |
zap.Int64("ownCount", ec), | |
zap.Int64("globalCount", atomic.LoadInt64(&resultCount)), | |
zap.Int64("id", e.id), | |
zap.Int64("prv", prv)) | |
} | |
} | |
if writeToFiles { | |
f.Close() | |
} | |
}(s) | |
return cancel, s | |
} | |
type Event struct { | |
id int64 | |
source string | |
msg string | |
} | |
type subscription struct { | |
name string | |
// closed on unsubscribe | |
ec int64 | |
previous int64 | |
done chan bool | |
listener chan *Event | |
} | |
type Publisher struct { | |
events chan *Event | |
done chan error | |
listeners chan *subscription | |
} | |
func (p *Publisher) Subscribe(ctx context.Context, name string, e chan *Event) *subscription { | |
s := &subscription{ | |
name: name, | |
ec: 0, | |
done: make(chan bool), | |
listener: e, | |
} | |
logger.Info("subscribing", zap.String("name", s.name), zap.Int64("after", atomic.LoadInt64(&resultCount))) | |
go func() { p.listeners <- s }() | |
go func() { | |
<-ctx.Done() | |
close(s.done) | |
s.listener = nil | |
}() | |
return s | |
} | |
func (p *Publisher) Publish(e *Event) { | |
p.events <- e | |
} | |
func (p *Publisher) Dispatch() error { | |
done := 0 | |
for { | |
select { | |
case err := <-p.done: | |
return err | |
case e := <-p.events: | |
var al []*subscription | |
loop: | |
for { | |
select { | |
case l := <-p.listeners: | |
select { | |
case _, ok := <-l.done: | |
if !ok { | |
logger.Info("subscription ended", | |
zap.String("event", e.msg), | |
zap.String("source", e.source), | |
zap.String("subscription", l.name)) | |
} | |
case l.listener <- e: | |
al = append(al, l) | |
case <-time.After(subscriptionTimeout): | |
logger.Info("subscription timeout", | |
zap.String("event", e.msg), | |
zap.String("source", e.source), | |
zap.String("subscription", l.name)) | |
} | |
case <-time.After(dispatchTimeout): | |
logger.Info("dispatch timeout", | |
zap.String("type", "not listening"), | |
zap.String("event", e.msg), | |
zap.String("source", e.source)) | |
break loop | |
} | |
} | |
for _, l := range al { | |
go func(l *subscription) { p.listeners <- l }(l) | |
} | |
case <-time.After(noEventTimeout): | |
done++ | |
logger.Info("dispatch timeout", zap.String("type", "no event")) | |
logger.Info("total event count", | |
zap.Int64("published", atomic.LoadInt64(&publishCount)), | |
zap.Int64("received", atomic.LoadInt64(&resultCount)), | |
zap.Int64("expected", int64((eventCount*subscriberCount)*publisherCount)), | |
zap.Int64("diff", int64((eventCount*subscriberCount)*publisherCount)-atomic.LoadInt64(&resultCount)), | |
zap.Int("events", eventCount), | |
zap.Int("subscribers", subscriberCount), | |
zap.Int64("realReceive", atomic.LoadInt64(&realReceive)), | |
) | |
if done > 10 { | |
p.done <- nil | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment