Created
August 15, 2021 15:02
-
-
Save s-l-teichmann/550b0a053d68d534d941f65cf0773658 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package datastore | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"sync" | |
) | |
// cacheSetFunc is a function to update cache keys. | |
type cacheSetFunc func(keys []string, set func(key string, value []byte)) error | |
// cache stores the values to the datastore. | |
// | |
// Each value of the cache has three states. Either it exists, it does not | |
// exist, or it is pending. Pending means, that there is a current request to | |
// the datastore. An existing key can have the value `nil` which means, that the | |
// cache knows, that the key does not exist in the datastore. Each value | |
// []byte("null") is changed to nil. | |
// | |
// A new cache instance has to be created with newCache(). | |
type cache struct { | |
sync.RWMutex | |
data map[string][]byte | |
pending map[string]chan struct{} | |
} | |
// newCache creates an initialized cache instance. | |
func newCache() *cache { | |
return &cache{ | |
data: map[string][]byte{}, | |
pending: map[string]chan struct{}{}, | |
} | |
} | |
// GetOrSet returns the values for a list of keys. If one or more keys do not | |
// exist in the cache, then the missing values are fetched with the given set | |
// function. If this method is called more then once at the same time, only the | |
// first call fetches the result, the other calles get blocked until it the | |
// answer was fetched. | |
// | |
// A non existing value is returned as nil. | |
// | |
// All values get returned together. If only one key is missing, this function | |
// blocks, until all values are retrieved. | |
// | |
// The set function is used to create the cache values. It is called only with | |
// the missing keys. | |
// | |
// If a value is not returned by the set function, it is saved in the cache as | |
// nil to prevent a second call for the same key. | |
// | |
// If the context is done, GetOrSet returns. But the set() call is not stopped. | |
// Other calls to GetOrSet may wait for its result. | |
func (c *cache) GetOrSet(ctx context.Context, keys []string, set cacheSetFunc) ([]json.RawMessage, error) { | |
values := make([]json.RawMessage, len(keys)) | |
// If we don't have the data for the index-th key | |
// we have to wait on ch till it has arrived. | |
type pendingIndex struct { | |
index int | |
ch chan struct{} | |
} | |
var pendings []pendingIndex | |
// chan len of 1 spares a go routine if we don't have a background fetch. | |
errCh := make(chan error, 1) | |
c.lock(func(gc *cache) { | |
var missing []string | |
for i, key := range keys { | |
// If we have it place in result directly. | |
if v, ok := gc.data[key]; ok { | |
values[i] = v | |
continue | |
} | |
// We don't have it. Is it already pending? | |
ch := gc.pending[key] | |
if ch == nil { | |
// Nobody asked before so we have to do it ourself. | |
missing = append(missing, key) | |
ch = make(chan struct{}) | |
gc.pending[key] = ch | |
} | |
pendings = append(pendings, pendingIndex{i, ch}) | |
} | |
// We don't have to ask for any values. | |
if len(missing) == 0 { | |
errCh <- nil | |
return | |
} | |
// Do background query for missing keys. | |
go func() { | |
// If the background query fail we have to ensure | |
// that all pending requests are awoken. | |
success := map[string]bool{} | |
defer gc.lock(func(gc *cache) { | |
for _, key := range missing { | |
if !success[key] { | |
gc.store(key, nil) | |
} | |
} | |
}) | |
errCh <- set(missing, func(key string, value []byte) { | |
gc.lock(func(gc *cache) { | |
gc.store(key, value) | |
success[key] = true | |
}) | |
}) | |
}() | |
}) | |
select { | |
case err := <-errCh: | |
if err != nil { | |
return nil, fmt.Errorf("fetching key: %w", err) | |
} | |
case <-ctx.Done(): | |
return nil, fmt.Errorf("waiting for fetch missing: %w", ctx.Err()) | |
} | |
// Wait for the pending values. | |
for _, p := range pendings { | |
<-p.ch | |
c.rlock(func(gc *cache) { | |
values[p.index] = gc.data[keys[p.index]] | |
}) | |
} | |
return values, nil | |
} | |
// store stores a value under a given key and wakes pending requests. | |
func (c *cache) store(key string, value []byte) { | |
c.data[key] = value | |
if ch := c.pending[key]; ch != nil { | |
delete(c.pending, key) | |
close(ch) | |
} | |
} | |
// SetIfExist updates the cache if the key exists or is pending. | |
func (c *cache) SetIfExist(key string, value []byte) { | |
c.Lock() | |
defer c.Unlock() | |
c.setIfExistUnlocked(key, value) | |
} | |
// SetIfExistMany is like SetIfExist but with many keys. | |
func (c *cache) SetIfExistMany(data map[string]json.RawMessage) { | |
c.Lock() | |
defer c.Unlock() | |
for key, value := range data { | |
c.setIfExistUnlocked(key, value) | |
} | |
} | |
// lock calls fn under a write lock. | |
func (c *cache) lock(fn func(*cache)) { | |
c.Lock() | |
defer c.Unlock() | |
fn(c) | |
} | |
// lock calls fn under a read lock. | |
func (c *cache) rlock(fn func(*cache)) { | |
c.RLock() | |
defer c.RUnlock() | |
fn(c) | |
} | |
// reset clears the cache. | |
func (c *cache) reset() { | |
c.Lock() | |
defer c.Unlock() | |
// Only delete entries which are not pending. | |
for key := range c.data { | |
if _, ok := c.pending[key]; !ok { | |
delete(c.data, key) | |
} | |
} | |
} | |
// setIfExistUnlocked is like setIfExist but without setting a lock. Should not be | |
// used directly. | |
func (c *cache) setIfExistUnlocked(key string, value []byte) { | |
pending := c.pending[key] | |
_, exists := c.data[key] | |
if pending == nil && !exists { | |
return | |
} | |
c.data[key] = value | |
if pending != nil { | |
close(pending) | |
delete(c.pending, key) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment