Skip to content

Instantly share code, notes, and snippets.

@KScaesar
Last active July 15, 2024 09:20
Show Gist options
  • Select an option

  • Save KScaesar/d297e9daa22879dd8d9c9f6b1258b333 to your computer and use it in GitHub Desktop.

Select an option

Save KScaesar/d297e9daa22879dd8d9c9f6b1258b333 to your computer and use it in GitHub Desktop.
type ReadCacheProxy[Input, ViewModel any] struct {
ReadCache func(input *Input) (output *ViewModel, err error)
ReadDatabase func(input *Input) (output *ViewModel, err error)
WriteCache func(output *ViewModel) error
Key string
Locks *sync.Map
}
func (proxy ReadCacheProxy[Input, ViewModel]) Read(input *Input) (output *ViewModel, err error) {
cacheVal, err := proxy.ReadCache(input)
if err == nil {
return cacheVal, nil
}
doneRead := make(chan struct{})
read := func() (*ViewModel, error) {
<-doneRead
return output, err
}
key := proxy.Key
Func, loaded := proxy.Locks.LoadOrStore(key, read)
if loaded {
onceRead := Func.(func() (*ViewModel, error))
return onceRead()
}
defer func() {
close(doneRead)
proxy.Locks.Delete(key)
}()
return proxy.read(input)
}
func (proxy *ReadCacheProxy[Input, ViewModel]) read(input *Input) (output *ViewModel, err error) {
cacheVal, err := proxy.ReadCache(input)
if err == nil {
return cacheVal, nil
}
dbVal, err := proxy.ReadDatabase(input)
if err != nil {
return nil, err
}
err = proxy.WriteCache(dbVal)
if err != nil {
return nil, err
}
return dbVal, nil
}
type WriteCacheProxy[DomainModel any, Func func(val *DomainModel) error] struct {
WriteDatabase Func
DeleteCache Func
}
func (proxy WriteCacheProxy[DomainModel, Func]) Write(val *DomainModel) error {
err := proxy.WriteDatabase(val)
if err != nil {
return err
}
return proxy.DeleteCache(val)
}
package main
import (
"errors"
"log"
"sync"
)
func main() {
cache := sync.Map{}
locks := sync.Map{}
key := "gg"
times := 0
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
n := i
wg.Add(1)
go func() {
defer wg.Done()
proxy := ReadCacheProxy[string, func() (*string, error), func(*string) error]{
ReadSecondary: func() (*string, error) {
value, ok := cache.Load(key)
if ok {
return value.(*string), nil
}
return nil, errors.New("not found")
},
ReadPrimary: func() (*string, error) {
times++
val := "read database"
if times != 1 {
panic("multi request to db")
}
return &val, nil
},
WriteSecondary: func(val *string) error {
cache.Store(key, val)
return nil
},
ShardLocks: &locks,
}
output, err := proxy.FastRead(key)
if err != nil {
panic(err)
}
log.Println(n, *output)
}()
}
wg.Wait()
}
// ReadCacheProxy 用於管理 database 和 cache 之間的讀取和寫入操作,
// 允許從 Secondary 中快速獲取資料,
// 如果無法取得,則嘗試從 Primary 中讀取,
// 並避免併發同一個 key 的大量請求到達 Primary, 解決 Hotspot Invalid, Cache Avalanche 問題
//
// ( Primary, Secondary ) 可以分別代表不同的資料存取方式,
// 例如 ( Database, Cache ) 或 ( LocalCache, RemoteCache )
type ReadCacheProxy[ViewModel any, rFunc func() (*ViewModel, error), wFunc func(*ViewModel) error] struct {
ReadSecondary rFunc
ReadPrimary rFunc
WriteSecondary wFunc
ShardLocks *sync.Map
}
func (proxy ReadCacheProxy[ViewModel, rFunc, wFunc]) FastRead(key any) (output *ViewModel, err error) {
output, err = proxy.ReadSecondary()
if err == nil {
return output, nil
}
return proxy.SafeRead(key)
}
func (proxy ReadCacheProxy[ViewModel, rFunc, wFunc]) SafeRead(key any) (output *ViewModel, err error) {
doneRead := make(chan struct{})
read := func() (*ViewModel, error) {
<-doneRead
return output, err
}
Func, loaded := proxy.ShardLocks.LoadOrStore(key, read)
if loaded {
// 避免併發同一個 key 的大量請求到達 Primary
// 共用第一次請求的 output
onceRead := Func.(rFunc)
return onceRead()
}
defer func() {
close(doneRead)
proxy.ShardLocks.Delete(key)
}()
return proxy.read()
}
func (proxy ReadCacheProxy[ViewModel, rFunc, wFunc]) read() (output *ViewModel, err error) {
output, err = proxy.ReadSecondary()
if err == nil {
return output, nil
}
output, err = proxy.ReadPrimary()
if err != nil {
return nil, err
}
err = proxy.WriteSecondary(output)
if err != nil {
return nil, err
}
return output, nil
}
type WriteCacheProxy struct {
WriteDatabase func() error
DeleteCache func() error
}
package main
import (
"errors"
"log"
"sync"
"time"
)
func main() {
cache := sync.Map{}
group := Singleflight[string, func() (*string, error)]{}
key := "gg"
var mu sync.Mutex
hitDB := 0
hitCache := 0
requests := 10000
t1 := time.Now()
wg := sync.WaitGroup{}
for i := 0; i < requests; i++ {
n := i
_ = n
wg.Add(1)
go func() {
defer wg.Done()
readProxy := ReadCacheProxy[string, func() (*string, error), func(*string) error]{
ReadReplica: func() (*string, error) {
value, ok := cache.Load(key)
if ok {
mu.Lock()
hitCache++
mu.Unlock()
return value.(*string), nil
}
return nil, errors.New("not found")
},
ReadPrimary: func() (*string, error) {
mu.Lock()
hitDB++
mu.Unlock()
val := "read val"
return &val, nil
},
WriteReplica: func(val *string) error {
cache.Store(key, val)
return nil
},
SingleFlight: &group,
}
// output, err := readProxy.ProtectPrimaryNode(key)
output, err := readProxy.ProtectAllNode(key)
if err != nil {
panic(err)
}
_ = output
// log.Println(n, *output)
}()
}
wg.Wait()
t2 := time.Now()
log.Println("cost", t2.Sub(t1).String())
log.Println("hit cache", hitCache)
if hitDB != 1 {
panic("multi req to db")
}
}
type Singleflight[Val any, F func() (*Val, error)] struct {
shardLocks sync.Map
}
// Do executes and returns the results of the given function,
// making sure that only one execution is in-flight for a given key at a time.
//
// If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether val was given to multiple callers.
//
// https://pkg.go.dev/golang.org/x/sync/singleflight#Group.Do
func (group *Singleflight[Val, F]) Do(key string, fn F) (val *Val, err error, shared bool) {
done := make(chan struct{})
wait := func() (*Val, error) {
<-done
return val, err
}
Func, loaded := group.shardLocks.LoadOrStore(key, wait)
if loaded {
val, err = Func.(func() (*Val, error))()
return val, err, true
}
defer close(done)
val, err = fn()
return val, err, false
}
// Forget tells the singleflight to forget about a key.
// Future calls to Do for this key will call the function
// rather than waiting for an earlier call to complete.
func (group *Singleflight[Val, F]) Forget(key string) {
group.shardLocks.Delete(key)
}
// Expire schedules a Forget operation for a given key after a specified duration.
//
// Note: Singleflight is designed to prevent cache hotspot invalidation (cache penetration)
// by ensuring that only one execution is in-flight for a given key at a time.
// It should not be used as a cache.
func (group *Singleflight[Val, F]) Expire(key string, t time.Duration) {
time.AfterFunc(t, func() {
group.Forget(key)
})
}
// ReadCacheProxy 用於管理 database 和 cache 之間的讀取和寫入操作,
// 允許從 Replica 中快速獲取資料, 如果無法取得,則嘗試從 Primary 中讀取.
//
// 同時避免併發同一個 key 的大量請求到達 Primary, 解決 Hotspot Invalid, Cache Avalanche 問題
//
// ( Primary, Replica ) 可以分別代表不同的資料存取方式,
// 例如 ( Database, Cache ) 或 ( LocalCache, RemoteCache )
type ReadCacheProxy[ViewModel any, Read func() (*ViewModel, error), Write func(*ViewModel) error] struct {
ReadReplica Read
ReadPrimary Read
WriteReplica Write
SingleFlight *Singleflight[ViewModel, Read]
}
func (proxy ReadCacheProxy[ViewModel, Read, Write]) read() (output *ViewModel, err error) {
output, err = proxy.ReadReplica()
if err == nil {
return output, nil
}
output, err = proxy.ReadPrimary()
if err != nil {
return nil, err
}
err = proxy.WriteReplica(output)
if err != nil {
return nil, err
}
return output, nil
}
func (proxy ReadCacheProxy[ViewModel, Read, Write]) ProtectPrimaryNode(key string) (output *ViewModel, err error) {
output, err = proxy.ReadReplica()
if err == nil {
return output, nil
}
return proxy.ProtectAllNode(key)
}
func (proxy ReadCacheProxy[ViewModel, Read, Write]) ProtectAllNode(key string) (output *ViewModel, err error) {
val, err, _ := proxy.SingleFlight.Do(key, proxy.read)
proxy.SingleFlight.Expire(key, time.Second)
// proxy.SingleFlight.Forget(key)
return val, err
}
type WriteCacheProxy struct {
WriteDatabase func() error
DeleteCache func() error
}
package main
import (
"errors"
"fmt"
"log"
"strconv"
"sync"
"time"
)
func main() {
cache := sync.Map{}
group := Singleflight[string, func() (*string, error)]{}
var mu sync.Mutex
hitDB := 0
hitCache := 0
requests := 10000
t1 := time.Now()
wg := sync.WaitGroup{}
for i := 0; i < requests; i++ {
n := i % (requests / 4)
key := strconv.Itoa(n)
wg.Add(1)
go func() {
defer wg.Done()
readProxy := ReadCacheProxy[string, func() (*string, error), func(*string) error]{
ReadReplica: func() (*string, error) {
value, ok := cache.Load(key)
if ok {
mu.Lock()
hitCache++
mu.Unlock()
return value.(*string), nil
}
return nil, errors.New("not found")
},
ReadPrimary: func() (*string, error) {
mu.Lock()
hitDB++
mu.Unlock()
val := "read val" + key
return &val, nil
},
WriteReplica: func(val *string) error {
cache.Store(key, val)
return nil
},
SingleFlight: &group,
}
// output, err := readProxy.ProtectPrimaryNode(key)
output, err := readProxy.ProtectAllNode(key)
if err != nil {
panic(err)
}
_ = output
// log.Println(n, *output)
}()
}
wg.Wait()
t2 := time.Now()
log.Println("cost", t2.Sub(t1).String())
log.Println("hit cache", hitCache)
if hitDB != (requests / 4) {
panic(fmt.Sprintf("hit db %v", hitDB))
}
}
type Singleflight[Val any, F func() (*Val, error)] struct {
mu sync.Mutex
store map[string]F
}
// Do executes and returns the results of the given function,
// making sure that only one execution is in-flight for a given key at a time.
//
// If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether val was given to multiple callers.
//
// https://pkg.go.dev/golang.org/x/sync/singleflight#Group.Do
func (group *Singleflight[Val, F]) Do(key string, fn F) (val *Val, err error, shared bool) {
group.mu.Lock()
if group.store == nil {
group.store = make(map[string]F)
}
wait, ok := group.store[key]
if ok {
group.mu.Unlock()
val, err = wait()
return val, err, true
}
done := make(chan struct{})
once := func() (*Val, error) {
<-done
return val, err
}
group.store[key] = once
defer func() {
close(done)
group.mu.Unlock()
}()
val, err = fn()
return val, err, false
}
// Forget tells the singleflight to forget about a key.
// Future calls to Do for this key will call the function
// rather than waiting for an earlier call to complete.
func (group *Singleflight[Val, F]) Forget(key string) {
group.mu.Lock()
delete(group.store, key)
group.mu.Unlock()
}
// Expire schedules a Forget operation for a given key after a specified duration.
//
// Note: Singleflight is designed to prevent cache hotspot invalidation (cache penetration)
// by ensuring that only one execution is in-flight for a given key at a time.
// It should not be used as a cache.
func (group *Singleflight[Val, F]) Expire(key string, t time.Duration) {
time.AfterFunc(t, func() {
group.Forget(key)
})
}
// ReadCacheProxy 用於管理 database 和 cache 之間的讀取和寫入操作,
// 允許從 Replica 中快速獲取資料, 如果無法取得,則嘗試從 Primary 中讀取.
//
// 同時避免併發同一個 key 的大量請求到達 Primary, 解決 Hotspot Invalid, Cache Avalanche 問題
//
// ( Primary, Replica ) 可以分別代表不同的資料存取方式,
// 例如 ( Database, Cache ) 或 ( RemoteCache, LocalCache )
type ReadCacheProxy[ViewModel any, Read func() (*ViewModel, error), Write func(*ViewModel) error] struct {
ReadReplica Read
ReadPrimary Read
WriteReplica Write
SingleFlight *Singleflight[ViewModel, Read]
}
func (proxy ReadCacheProxy[ViewModel, Read, Write]) read() (output *ViewModel, err error) {
output, err = proxy.ReadReplica()
if err == nil {
return output, nil
}
output, err = proxy.ReadPrimary()
if err != nil {
return nil, err
}
err = proxy.WriteReplica(output)
if err != nil {
return nil, err
}
return output, nil
}
func (proxy ReadCacheProxy[ViewModel, Read, Write]) ProtectPrimaryNode(key string) (output *ViewModel, err error) {
output, err = proxy.ReadReplica()
if err == nil {
return output, nil
}
return proxy.ProtectAllNode(key)
}
func (proxy ReadCacheProxy[ViewModel, Read, Write]) ProtectAllNode(key string) (output *ViewModel, err error) {
val, err, _ := proxy.SingleFlight.Do(key, proxy.read)
proxy.SingleFlight.Expire(key, time.Second)
// proxy.SingleFlight.Forget(key)
return val, err
}
type WriteCacheProxy struct {
WriteDatabase func() error
DeleteCache func() error
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment