Last active
September 16, 2020 23:44
-
-
Save clarkmcc/2248779de2ace85c2fe8a5af68f5fc6d to your computer and use it in GitHub Desktop.
A zero dependency, callback based batcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package reflector | |
import ( | |
"sync" | |
) | |
// Batcher knows how to batch a slice of objects, calling do for each batch | |
// until Next returns false. | |
// | |
// batcher.Next(func(objs []interface{}) { | |
// fmt.Println(objs) | |
// }) | |
// | |
type Batcher interface { | |
Next(do func([]interface{})) bool | |
} | |
type BatchConfig struct { | |
// BatchSize represents the maximum number of items in each batch | |
BatchSize int | |
} | |
// Does batcher implement Batcher? | |
var _ Batcher = (*batcher)(nil) | |
// batcher uses a batching config to provide batching functionality to a slice | |
// of static objects or objects provided from a channel. | |
type batcher struct { | |
mu sync.Mutex | |
objects []interface{} | |
config BatchConfig | |
hasChannelProvider bool | |
provider <-chan interface{} | |
} | |
// Next batches objects loaded into the batcher, and calls do passing in each | |
// batch, where the size of the batch is determined by the BatchConfig. This | |
// method is threadsafe which allows for the objects slice to be modified while | |
// the batcher is running. | |
// | |
// Next batching logic is based largely on the provider of the objects, if the | |
// provider of the objects is static, and all the objects are initialized when | |
// the batcher is created, then batching is done by cutting the batch size of | |
// the objects slice, updating the objects slice, and calling do with the batch. | |
// | |
// If however the batcher is based on a channel for a provider, then the batching | |
// logic should be different. Because the channel provider can deliver objects | |
// over time, and because the Next function can be called over and over without | |
// any delay, using the standard batching logic results in a lot of partial batches. | |
// The solution is instead to check and see if we have a channel provider, if we | |
// do then Next becomes a no-op until the b.objects is at least the batch size. | |
// If in the process of waiting, the channel closes, then we create a partial | |
// batch which all the objects that are currently contained in the batcher. | |
func (b *batcher) Next(do func([]interface{})) (ok bool) { | |
b.mu.Lock() | |
defer b.mu.Unlock() | |
ok = true | |
l := len(b.objects) | |
s := b.config.BatchSize | |
c := make([]interface{}, s) | |
if b.hasChannelProvider { | |
if b.provider == nil { | |
// we're done, submit the batch as is | |
c = make([]interface{}, len(b.objects)) | |
copy(c, b.objects) | |
b.objects = make([]interface{}, 0) | |
ok = false | |
} else { | |
if l < s { | |
return ok | |
} | |
c = b.objects[:s] | |
b.objects = b.objects[s:] | |
} | |
} else { | |
if l <= s { | |
c = make([]interface{}, len(b.objects)) | |
copy(c, b.objects) | |
b.objects = make([]interface{}, 0) | |
// if there is a channel feeding the batcher, then we should only return | |
// false if the channel is done, if there is not a channel, then we can | |
// return false immediately | |
if b.provider == nil { | |
ok = false | |
} | |
} else { | |
c = b.objects[:s] | |
b.objects = b.objects[s:] | |
} | |
} | |
if len(c) == 0 { | |
return ok | |
} | |
do(c) | |
return ok | |
} | |
// AddObjs locks the batcher and can add any number of objects | |
func (b *batcher) AddObjs(objs ...interface{}) { | |
b.mu.Lock() | |
defer b.mu.Unlock() | |
b.objects = append(b.objects, objs...) | |
} | |
// FromChannel returns a new batcher that is supplied by the channel, the channel | |
// must receive interface{}s, and the channel must close when it is finished, as | |
// the Next() method will block until the channel indicates that there are no more | |
// objects on the way. | |
func FromChannel(objCh <-chan interface{}, config BatchConfig) *batcher { | |
b := &batcher{ | |
config: config, | |
objects: []interface{}{}, | |
hasChannelProvider: true, | |
provider: objCh, | |
} | |
go func() { | |
for o := range objCh { | |
b.AddObjs(o) | |
} | |
b.mu.Lock() | |
defer b.mu.Unlock() | |
b.provider = nil | |
}() | |
return b | |
} | |
// FromObjects returns a new batcher that is supplied by the objects provided | |
func FromObjects(objects []interface{}, config BatchConfig) *batcher { | |
return &batcher{ | |
objects: objects, | |
config: config, | |
} | |
} | |
// Does noopBatcher implement Batcher? | |
var _ Batcher = (*noopBatcher)(nil) | |
// noopBatcher implements the Batcher interface but provides all objects in one | |
// Next() call rather then using a traditional batching methodology. | |
type noopBatcher struct { | |
objects []interface{} | |
} | |
// Next calls do with all the objects in the batcher and returns false indicating | |
// no other values to iterate over. | |
func (n *noopBatcher) Next(do func([]interface{})) bool { | |
do(n.objects) | |
return false | |
} | |
// NoopFromObjects returns a new noopBatcher with the provided objects | |
func NoopFromObjects(objects []interface{}) *noopBatcher { | |
return &noopBatcher{objects: objects} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment