Skip to content

Instantly share code, notes, and snippets.

@clarkmcc
Last active September 16, 2020 23:44
Show Gist options
  • Save clarkmcc/2248779de2ace85c2fe8a5af68f5fc6d to your computer and use it in GitHub Desktop.
Save clarkmcc/2248779de2ace85c2fe8a5af68f5fc6d to your computer and use it in GitHub Desktop.
A zero dependency, callback based batcher
package reflector
import (
"sync"
)
// Batcher knows how to batch a slice of objects, calling do for each batch
// until Next returns false.
//
// batcher.Next(func(objs []interface{}) {
// fmt.Println(objs)
// })
//
type Batcher interface {
Next(do func([]interface{})) bool
}
type BatchConfig struct {
// BatchSize represents the maximum number of items in each batch
BatchSize int
}
// Does batcher implement Batcher?
var _ Batcher = (*batcher)(nil)
// batcher uses a batching config to provide batching functionality to a slice
// of static objects or objects provided from a channel.
type batcher struct {
mu sync.Mutex
objects []interface{}
config BatchConfig
hasChannelProvider bool
provider <-chan interface{}
}
// Next batches objects loaded into the batcher, and calls do passing in each
// batch, where the size of the batch is determined by the BatchConfig. This
// method is threadsafe which allows for the objects slice to be modified while
// the batcher is running.
//
// Next batching logic is based largely on the provider of the objects, if the
// provider of the objects is static, and all the objects are initialized when
// the batcher is created, then batching is done by cutting the batch size of
// the objects slice, updating the objects slice, and calling do with the batch.
//
// If however the batcher is based on a channel for a provider, then the batching
// logic should be different. Because the channel provider can deliver objects
// over time, and because the Next function can be called over and over without
// any delay, using the standard batching logic results in a lot of partial batches.
// The solution is instead to check and see if we have a channel provider, if we
// do then Next becomes a no-op until the b.objects is at least the batch size.
// If in the process of waiting, the channel closes, then we create a partial
// batch which all the objects that are currently contained in the batcher.
func (b *batcher) Next(do func([]interface{})) (ok bool) {
b.mu.Lock()
defer b.mu.Unlock()
ok = true
l := len(b.objects)
s := b.config.BatchSize
c := make([]interface{}, s)
if b.hasChannelProvider {
if b.provider == nil {
// we're done, submit the batch as is
c = make([]interface{}, len(b.objects))
copy(c, b.objects)
b.objects = make([]interface{}, 0)
ok = false
} else {
if l < s {
return ok
}
c = b.objects[:s]
b.objects = b.objects[s:]
}
} else {
if l <= s {
c = make([]interface{}, len(b.objects))
copy(c, b.objects)
b.objects = make([]interface{}, 0)
// if there is a channel feeding the batcher, then we should only return
// false if the channel is done, if there is not a channel, then we can
// return false immediately
if b.provider == nil {
ok = false
}
} else {
c = b.objects[:s]
b.objects = b.objects[s:]
}
}
if len(c) == 0 {
return ok
}
do(c)
return ok
}
// AddObjs locks the batcher and can add any number of objects
func (b *batcher) AddObjs(objs ...interface{}) {
b.mu.Lock()
defer b.mu.Unlock()
b.objects = append(b.objects, objs...)
}
// FromChannel returns a new batcher that is supplied by the channel, the channel
// must receive interface{}s, and the channel must close when it is finished, as
// the Next() method will block until the channel indicates that there are no more
// objects on the way.
func FromChannel(objCh <-chan interface{}, config BatchConfig) *batcher {
b := &batcher{
config: config,
objects: []interface{}{},
hasChannelProvider: true,
provider: objCh,
}
go func() {
for o := range objCh {
b.AddObjs(o)
}
b.mu.Lock()
defer b.mu.Unlock()
b.provider = nil
}()
return b
}
// FromObjects returns a new batcher that is supplied by the objects provided
func FromObjects(objects []interface{}, config BatchConfig) *batcher {
return &batcher{
objects: objects,
config: config,
}
}
// Does noopBatcher implement Batcher?
var _ Batcher = (*noopBatcher)(nil)
// noopBatcher implements the Batcher interface but provides all objects in one
// Next() call rather then using a traditional batching methodology.
type noopBatcher struct {
objects []interface{}
}
// Next calls do with all the objects in the batcher and returns false indicating
// no other values to iterate over.
func (n *noopBatcher) Next(do func([]interface{})) bool {
do(n.objects)
return false
}
// NoopFromObjects returns a new noopBatcher with the provided objects
func NoopFromObjects(objects []interface{}) *noopBatcher {
return &noopBatcher{objects: objects}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment