Skip to content

Instantly share code, notes, and snippets.

@s-l-teichmann
Created August 15, 2021 15:02
Show Gist options
  • Save s-l-teichmann/550b0a053d68d534d941f65cf0773658 to your computer and use it in GitHub Desktop.
Save s-l-teichmann/550b0a053d68d534d941f65cf0773658 to your computer and use it in GitHub Desktop.
package datastore
import (
"context"
"encoding/json"
"fmt"
"sync"
)
// cacheSetFunc is a function to update cache keys.
type cacheSetFunc func(keys []string, set func(key string, value []byte)) error
// cache stores the values to the datastore.
//
// Each value of the cache has three states. Either it exists, it does not
// exist, or it is pending. Pending means, that there is a current request to
// the datastore. An existing key can have the value `nil` which means, that the
// cache knows, that the key does not exist in the datastore. Each value
// []byte("null") is changed to nil.
//
// A new cache instance has to be created with newCache().
type cache struct {
sync.RWMutex
data map[string][]byte
pending map[string]chan struct{}
}
// newCache creates an initialized cache instance.
func newCache() *cache {
return &cache{
data: map[string][]byte{},
pending: map[string]chan struct{}{},
}
}
// GetOrSet returns the values for a list of keys. If one or more keys do not
// exist in the cache, then the missing values are fetched with the given set
// function. If this method is called more then once at the same time, only the
// first call fetches the result, the other calles get blocked until it the
// answer was fetched.
//
// A non existing value is returned as nil.
//
// All values get returned together. If only one key is missing, this function
// blocks, until all values are retrieved.
//
// The set function is used to create the cache values. It is called only with
// the missing keys.
//
// If a value is not returned by the set function, it is saved in the cache as
// nil to prevent a second call for the same key.
//
// If the context is done, GetOrSet returns. But the set() call is not stopped.
// Other calls to GetOrSet may wait for its result.
func (c *cache) GetOrSet(ctx context.Context, keys []string, set cacheSetFunc) ([]json.RawMessage, error) {
values := make([]json.RawMessage, len(keys))
// If we don't have the data for the index-th key
// we have to wait on ch till it has arrived.
type pendingIndex struct {
index int
ch chan struct{}
}
var pendings []pendingIndex
// chan len of 1 spares a go routine if we don't have a background fetch.
errCh := make(chan error, 1)
c.lock(func(gc *cache) {
var missing []string
for i, key := range keys {
// If we have it place in result directly.
if v, ok := gc.data[key]; ok {
values[i] = v
continue
}
// We don't have it. Is it already pending?
ch := gc.pending[key]
if ch == nil {
// Nobody asked before so we have to do it ourself.
missing = append(missing, key)
ch = make(chan struct{})
gc.pending[key] = ch
}
pendings = append(pendings, pendingIndex{i, ch})
}
// We don't have to ask for any values.
if len(missing) == 0 {
errCh <- nil
return
}
// Do background query for missing keys.
go func() {
// If the background query fail we have to ensure
// that all pending requests are awoken.
success := map[string]bool{}
defer gc.lock(func(gc *cache) {
for _, key := range missing {
if !success[key] {
gc.store(key, nil)
}
}
})
errCh <- set(missing, func(key string, value []byte) {
gc.lock(func(gc *cache) {
gc.store(key, value)
success[key] = true
})
})
}()
})
select {
case err := <-errCh:
if err != nil {
return nil, fmt.Errorf("fetching key: %w", err)
}
case <-ctx.Done():
return nil, fmt.Errorf("waiting for fetch missing: %w", ctx.Err())
}
// Wait for the pending values.
for _, p := range pendings {
<-p.ch
c.rlock(func(gc *cache) {
values[p.index] = gc.data[keys[p.index]]
})
}
return values, nil
}
// store stores a value under a given key and wakes pending requests.
func (c *cache) store(key string, value []byte) {
c.data[key] = value
if ch := c.pending[key]; ch != nil {
delete(c.pending, key)
close(ch)
}
}
// SetIfExist updates the cache if the key exists or is pending.
func (c *cache) SetIfExist(key string, value []byte) {
c.Lock()
defer c.Unlock()
c.setIfExistUnlocked(key, value)
}
// SetIfExistMany is like SetIfExist but with many keys.
func (c *cache) SetIfExistMany(data map[string]json.RawMessage) {
c.Lock()
defer c.Unlock()
for key, value := range data {
c.setIfExistUnlocked(key, value)
}
}
// lock calls fn under a write lock.
func (c *cache) lock(fn func(*cache)) {
c.Lock()
defer c.Unlock()
fn(c)
}
// lock calls fn under a read lock.
func (c *cache) rlock(fn func(*cache)) {
c.RLock()
defer c.RUnlock()
fn(c)
}
// reset clears the cache.
func (c *cache) reset() {
c.Lock()
defer c.Unlock()
// Only delete entries which are not pending.
for key := range c.data {
if _, ok := c.pending[key]; !ok {
delete(c.data, key)
}
}
}
// setIfExistUnlocked is like setIfExist but without setting a lock. Should not be
// used directly.
func (c *cache) setIfExistUnlocked(key string, value []byte) {
pending := c.pending[key]
_, exists := c.data[key]
if pending == nil && !exists {
return
}
c.data[key] = value
if pending != nil {
close(pending)
delete(c.pending, key)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment