Skip to content

Instantly share code, notes, and snippets.

@maxsei
Created July 5, 2024 19:05
Show Gist options
  • Select an option

  • Save maxsei/831068666c1bc086d37fc8a73027ef5a to your computer and use it in GitHub Desktop.

Select an option

Save maxsei/831068666c1bc086d37fc8a73027ef5a to your computer and use it in GitHub Desktop.
bus implementation with a few different publishing policies
package bus
import (
"errors"
"slices"
"sync"
)
var (
ErrBusClosed = errors.New("closed")
ErrSubscriberNotFound = errors.New("subscriber not found")
)
const (
DeliveredOnce DeliveryMode = iota
DeliveredOnceWithUnsub
DeliveredAtMostOnce
)
type DeliveryMode byte
type Publisher [T any]interface {
Pub(b *Bus[T], msg T)
}
func NewBus[T any](publisher Publisher[T]) *Bus[T] {
return &Bus[T]{}
}
type Bus[T any] struct {
mode DeliveryMode
subscribers []chan T
m sync.RWMutex
closed bool
}
func (b *Bus[T]) Pub(msg T) error {
b.m.RLocker().Lock()
if b.closed {
b.m.RLocker().Unlock()
return ErrBusClosed
}
switch b.mode {
case DeliveredOnce:
for _, sub := range b.subscribers {
sub <- msg
}
case DeliveredAtMostOnce:
for _, sub := range b.subscribers {
select {
case sub <- msg:
default:
}
}
case DeliveredOnceWithUnsub:
for _, sub := range b.subscribers {
select {
case sub <- msg:
default:
defer b.Unsub(sub)
}
}
}
b.m.RLocker().Unlock()
return nil
}
func (b *Bus[T]) Sub() (<-chan T, error) {
b.m.Lock()
if b.closed {
return nil, ErrBusClosed
}
res := make(chan T, 1)
b.subscribers = append(b.subscribers, res)
b.m.Unlock()
return res, nil
}
func (b *Bus[T]) Unsub(sub <-chan T) error {
b.m.Lock()
defer b.m.Unlock()
if b.closed {
return ErrBusClosed
}
for i := range b.subscribers {
if b.subscribers[i] != sub {
continue
}
close(b.subscribers[i])
b.subscribers = slices.Delete(b.subscribers, i, i+1)
return nil
}
return ErrSubscriberNotFound
}
func (b *Bus[T]) Close() error {
b.m.Lock()
if b.closed {
return ErrBusClosed
}
b.closed = true
for i := range b.subscribers {
close(b.subscribers[i])
}
b.subscribers = b.subscribers[:]
b.m.Unlock()
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment