Last active
July 15, 2024 09:20
-
-
Save KScaesar/d297e9daa22879dd8d9c9f6b1258b333 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type ReadCacheProxy[Input, ViewModel any] struct { | |
| ReadCache func(input *Input) (output *ViewModel, err error) | |
| ReadDatabase func(input *Input) (output *ViewModel, err error) | |
| WriteCache func(output *ViewModel) error | |
| Key string | |
| Locks *sync.Map | |
| } | |
| func (proxy ReadCacheProxy[Input, ViewModel]) Read(input *Input) (output *ViewModel, err error) { | |
| cacheVal, err := proxy.ReadCache(input) | |
| if err == nil { | |
| return cacheVal, nil | |
| } | |
| doneRead := make(chan struct{}) | |
| read := func() (*ViewModel, error) { | |
| <-doneRead | |
| return output, err | |
| } | |
| key := proxy.Key | |
| Func, loaded := proxy.Locks.LoadOrStore(key, read) | |
| if loaded { | |
| onceRead := Func.(func() (*ViewModel, error)) | |
| return onceRead() | |
| } | |
| defer func() { | |
| close(doneRead) | |
| proxy.Locks.Delete(key) | |
| }() | |
| return proxy.read(input) | |
| } | |
| func (proxy *ReadCacheProxy[Input, ViewModel]) read(input *Input) (output *ViewModel, err error) { | |
| cacheVal, err := proxy.ReadCache(input) | |
| if err == nil { | |
| return cacheVal, nil | |
| } | |
| dbVal, err := proxy.ReadDatabase(input) | |
| if err != nil { | |
| return nil, err | |
| } | |
| err = proxy.WriteCache(dbVal) | |
| if err != nil { | |
| return nil, err | |
| } | |
| return dbVal, nil | |
| } | |
| type WriteCacheProxy[DomainModel any, Func func(val *DomainModel) error] struct { | |
| WriteDatabase Func | |
| DeleteCache Func | |
| } | |
| func (proxy WriteCacheProxy[DomainModel, Func]) Write(val *DomainModel) error { | |
| err := proxy.WriteDatabase(val) | |
| if err != nil { | |
| return err | |
| } | |
| return proxy.DeleteCache(val) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "errors" | |
| "log" | |
| "sync" | |
| ) | |
| func main() { | |
| cache := sync.Map{} | |
| locks := sync.Map{} | |
| key := "gg" | |
| times := 0 | |
| wg := sync.WaitGroup{} | |
| for i := 0; i < 100; i++ { | |
| n := i | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| proxy := ReadCacheProxy[string, func() (*string, error), func(*string) error]{ | |
| ReadSecondary: func() (*string, error) { | |
| value, ok := cache.Load(key) | |
| if ok { | |
| return value.(*string), nil | |
| } | |
| return nil, errors.New("not found") | |
| }, | |
| ReadPrimary: func() (*string, error) { | |
| times++ | |
| val := "read database" | |
| if times != 1 { | |
| panic("multi request to db") | |
| } | |
| return &val, nil | |
| }, | |
| WriteSecondary: func(val *string) error { | |
| cache.Store(key, val) | |
| return nil | |
| }, | |
| ShardLocks: &locks, | |
| } | |
| output, err := proxy.FastRead(key) | |
| if err != nil { | |
| panic(err) | |
| } | |
| log.Println(n, *output) | |
| }() | |
| } | |
| wg.Wait() | |
| } | |
| // ReadCacheProxy 用於管理 database 和 cache 之間的讀取和寫入操作, | |
| // 允許從 Secondary 中快速獲取資料, | |
| // 如果無法取得,則嘗試從 Primary 中讀取, | |
| // 並避免併發同一個 key 的大量請求到達 Primary, 解決 Hotspot Invalid, Cache Avalanche 問題 | |
| // | |
| // ( Primary, Secondary ) 可以分別代表不同的資料存取方式, | |
| // 例如 ( Database, Cache ) 或 ( LocalCache, RemoteCache ) | |
| type ReadCacheProxy[ViewModel any, rFunc func() (*ViewModel, error), wFunc func(*ViewModel) error] struct { | |
| ReadSecondary rFunc | |
| ReadPrimary rFunc | |
| WriteSecondary wFunc | |
| ShardLocks *sync.Map | |
| } | |
| func (proxy ReadCacheProxy[ViewModel, rFunc, wFunc]) FastRead(key any) (output *ViewModel, err error) { | |
| output, err = proxy.ReadSecondary() | |
| if err == nil { | |
| return output, nil | |
| } | |
| return proxy.SafeRead(key) | |
| } | |
| func (proxy ReadCacheProxy[ViewModel, rFunc, wFunc]) SafeRead(key any) (output *ViewModel, err error) { | |
| doneRead := make(chan struct{}) | |
| read := func() (*ViewModel, error) { | |
| <-doneRead | |
| return output, err | |
| } | |
| Func, loaded := proxy.ShardLocks.LoadOrStore(key, read) | |
| if loaded { | |
| // 避免併發同一個 key 的大量請求到達 Primary | |
| // 共用第一次請求的 output | |
| onceRead := Func.(rFunc) | |
| return onceRead() | |
| } | |
| defer func() { | |
| close(doneRead) | |
| proxy.ShardLocks.Delete(key) | |
| }() | |
| return proxy.read() | |
| } | |
| func (proxy ReadCacheProxy[ViewModel, rFunc, wFunc]) read() (output *ViewModel, err error) { | |
| output, err = proxy.ReadSecondary() | |
| if err == nil { | |
| return output, nil | |
| } | |
| output, err = proxy.ReadPrimary() | |
| if err != nil { | |
| return nil, err | |
| } | |
| err = proxy.WriteSecondary(output) | |
| if err != nil { | |
| return nil, err | |
| } | |
| return output, nil | |
| } | |
| type WriteCacheProxy struct { | |
| WriteDatabase func() error | |
| DeleteCache func() error | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "errors" | |
| "log" | |
| "sync" | |
| "time" | |
| ) | |
| func main() { | |
| cache := sync.Map{} | |
| group := Singleflight[string, func() (*string, error)]{} | |
| key := "gg" | |
| var mu sync.Mutex | |
| hitDB := 0 | |
| hitCache := 0 | |
| requests := 10000 | |
| t1 := time.Now() | |
| wg := sync.WaitGroup{} | |
| for i := 0; i < requests; i++ { | |
| n := i | |
| _ = n | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| readProxy := ReadCacheProxy[string, func() (*string, error), func(*string) error]{ | |
| ReadReplica: func() (*string, error) { | |
| value, ok := cache.Load(key) | |
| if ok { | |
| mu.Lock() | |
| hitCache++ | |
| mu.Unlock() | |
| return value.(*string), nil | |
| } | |
| return nil, errors.New("not found") | |
| }, | |
| ReadPrimary: func() (*string, error) { | |
| mu.Lock() | |
| hitDB++ | |
| mu.Unlock() | |
| val := "read val" | |
| return &val, nil | |
| }, | |
| WriteReplica: func(val *string) error { | |
| cache.Store(key, val) | |
| return nil | |
| }, | |
| SingleFlight: &group, | |
| } | |
| // output, err := readProxy.ProtectPrimaryNode(key) | |
| output, err := readProxy.ProtectAllNode(key) | |
| if err != nil { | |
| panic(err) | |
| } | |
| _ = output | |
| // log.Println(n, *output) | |
| }() | |
| } | |
| wg.Wait() | |
| t2 := time.Now() | |
| log.Println("cost", t2.Sub(t1).String()) | |
| log.Println("hit cache", hitCache) | |
| if hitDB != 1 { | |
| panic("multi req to db") | |
| } | |
| } | |
| type Singleflight[Val any, F func() (*Val, error)] struct { | |
| shardLocks sync.Map | |
| } | |
| // Do executes and returns the results of the given function, | |
| // making sure that only one execution is in-flight for a given key at a time. | |
| // | |
| // If a duplicate comes in, the duplicate caller waits for the | |
| // original to complete and receives the same results. | |
| // The return value shared indicates whether val was given to multiple callers. | |
| // | |
| // https://pkg.go.dev/golang.org/x/sync/singleflight#Group.Do | |
| func (group *Singleflight[Val, F]) Do(key string, fn F) (val *Val, err error, shared bool) { | |
| done := make(chan struct{}) | |
| wait := func() (*Val, error) { | |
| <-done | |
| return val, err | |
| } | |
| Func, loaded := group.shardLocks.LoadOrStore(key, wait) | |
| if loaded { | |
| val, err = Func.(func() (*Val, error))() | |
| return val, err, true | |
| } | |
| defer close(done) | |
| val, err = fn() | |
| return val, err, false | |
| } | |
| // Forget tells the singleflight to forget about a key. | |
| // Future calls to Do for this key will call the function | |
| // rather than waiting for an earlier call to complete. | |
| func (group *Singleflight[Val, F]) Forget(key string) { | |
| group.shardLocks.Delete(key) | |
| } | |
| // Expire schedules a Forget operation for a given key after a specified duration. | |
| // | |
| // Note: Singleflight is designed to prevent cache hotspot invalidation (cache penetration) | |
| // by ensuring that only one execution is in-flight for a given key at a time. | |
| // It should not be used as a cache. | |
| func (group *Singleflight[Val, F]) Expire(key string, t time.Duration) { | |
| time.AfterFunc(t, func() { | |
| group.Forget(key) | |
| }) | |
| } | |
| // ReadCacheProxy 用於管理 database 和 cache 之間的讀取和寫入操作, | |
| // 允許從 Replica 中快速獲取資料, 如果無法取得,則嘗試從 Primary 中讀取. | |
| // | |
| // 同時避免併發同一個 key 的大量請求到達 Primary, 解決 Hotspot Invalid, Cache Avalanche 問題 | |
| // | |
| // ( Primary, Replica ) 可以分別代表不同的資料存取方式, | |
| // 例如 ( Database, Cache ) 或 ( LocalCache, RemoteCache ) | |
| type ReadCacheProxy[ViewModel any, Read func() (*ViewModel, error), Write func(*ViewModel) error] struct { | |
| ReadReplica Read | |
| ReadPrimary Read | |
| WriteReplica Write | |
| SingleFlight *Singleflight[ViewModel, Read] | |
| } | |
| func (proxy ReadCacheProxy[ViewModel, Read, Write]) read() (output *ViewModel, err error) { | |
| output, err = proxy.ReadReplica() | |
| if err == nil { | |
| return output, nil | |
| } | |
| output, err = proxy.ReadPrimary() | |
| if err != nil { | |
| return nil, err | |
| } | |
| err = proxy.WriteReplica(output) | |
| if err != nil { | |
| return nil, err | |
| } | |
| return output, nil | |
| } | |
| func (proxy ReadCacheProxy[ViewModel, Read, Write]) ProtectPrimaryNode(key string) (output *ViewModel, err error) { | |
| output, err = proxy.ReadReplica() | |
| if err == nil { | |
| return output, nil | |
| } | |
| return proxy.ProtectAllNode(key) | |
| } | |
| func (proxy ReadCacheProxy[ViewModel, Read, Write]) ProtectAllNode(key string) (output *ViewModel, err error) { | |
| val, err, _ := proxy.SingleFlight.Do(key, proxy.read) | |
| proxy.SingleFlight.Expire(key, time.Second) | |
| // proxy.SingleFlight.Forget(key) | |
| return val, err | |
| } | |
| type WriteCacheProxy struct { | |
| WriteDatabase func() error | |
| DeleteCache func() error | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "errors" | |
| "fmt" | |
| "log" | |
| "strconv" | |
| "sync" | |
| "time" | |
| ) | |
| func main() { | |
| cache := sync.Map{} | |
| group := Singleflight[string, func() (*string, error)]{} | |
| var mu sync.Mutex | |
| hitDB := 0 | |
| hitCache := 0 | |
| requests := 10000 | |
| t1 := time.Now() | |
| wg := sync.WaitGroup{} | |
| for i := 0; i < requests; i++ { | |
| n := i % (requests / 4) | |
| key := strconv.Itoa(n) | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| readProxy := ReadCacheProxy[string, func() (*string, error), func(*string) error]{ | |
| ReadReplica: func() (*string, error) { | |
| value, ok := cache.Load(key) | |
| if ok { | |
| mu.Lock() | |
| hitCache++ | |
| mu.Unlock() | |
| return value.(*string), nil | |
| } | |
| return nil, errors.New("not found") | |
| }, | |
| ReadPrimary: func() (*string, error) { | |
| mu.Lock() | |
| hitDB++ | |
| mu.Unlock() | |
| val := "read val" + key | |
| return &val, nil | |
| }, | |
| WriteReplica: func(val *string) error { | |
| cache.Store(key, val) | |
| return nil | |
| }, | |
| SingleFlight: &group, | |
| } | |
| // output, err := readProxy.ProtectPrimaryNode(key) | |
| output, err := readProxy.ProtectAllNode(key) | |
| if err != nil { | |
| panic(err) | |
| } | |
| _ = output | |
| // log.Println(n, *output) | |
| }() | |
| } | |
| wg.Wait() | |
| t2 := time.Now() | |
| log.Println("cost", t2.Sub(t1).String()) | |
| log.Println("hit cache", hitCache) | |
| if hitDB != (requests / 4) { | |
| panic(fmt.Sprintf("hit db %v", hitDB)) | |
| } | |
| } | |
| type Singleflight[Val any, F func() (*Val, error)] struct { | |
| mu sync.Mutex | |
| store map[string]F | |
| } | |
| // Do executes and returns the results of the given function, | |
| // making sure that only one execution is in-flight for a given key at a time. | |
| // | |
| // If a duplicate comes in, the duplicate caller waits for the | |
| // original to complete and receives the same results. | |
| // The return value shared indicates whether val was given to multiple callers. | |
| // | |
| // https://pkg.go.dev/golang.org/x/sync/singleflight#Group.Do | |
| func (group *Singleflight[Val, F]) Do(key string, fn F) (val *Val, err error, shared bool) { | |
| group.mu.Lock() | |
| if group.store == nil { | |
| group.store = make(map[string]F) | |
| } | |
| wait, ok := group.store[key] | |
| if ok { | |
| group.mu.Unlock() | |
| val, err = wait() | |
| return val, err, true | |
| } | |
| done := make(chan struct{}) | |
| once := func() (*Val, error) { | |
| <-done | |
| return val, err | |
| } | |
| group.store[key] = once | |
| defer func() { | |
| close(done) | |
| group.mu.Unlock() | |
| }() | |
| val, err = fn() | |
| return val, err, false | |
| } | |
| // Forget tells the singleflight to forget about a key. | |
| // Future calls to Do for this key will call the function | |
| // rather than waiting for an earlier call to complete. | |
| func (group *Singleflight[Val, F]) Forget(key string) { | |
| group.mu.Lock() | |
| delete(group.store, key) | |
| group.mu.Unlock() | |
| } | |
| // Expire schedules a Forget operation for a given key after a specified duration. | |
| // | |
| // Note: Singleflight is designed to prevent cache hotspot invalidation (cache penetration) | |
| // by ensuring that only one execution is in-flight for a given key at a time. | |
| // It should not be used as a cache. | |
| func (group *Singleflight[Val, F]) Expire(key string, t time.Duration) { | |
| time.AfterFunc(t, func() { | |
| group.Forget(key) | |
| }) | |
| } | |
| // ReadCacheProxy 用於管理 database 和 cache 之間的讀取和寫入操作, | |
| // 允許從 Replica 中快速獲取資料, 如果無法取得,則嘗試從 Primary 中讀取. | |
| // | |
| // 同時避免併發同一個 key 的大量請求到達 Primary, 解決 Hotspot Invalid, Cache Avalanche 問題 | |
| // | |
| // ( Primary, Replica ) 可以分別代表不同的資料存取方式, | |
| // 例如 ( Database, Cache ) 或 ( RemoteCache, LocalCache ) | |
| type ReadCacheProxy[ViewModel any, Read func() (*ViewModel, error), Write func(*ViewModel) error] struct { | |
| ReadReplica Read | |
| ReadPrimary Read | |
| WriteReplica Write | |
| SingleFlight *Singleflight[ViewModel, Read] | |
| } | |
| func (proxy ReadCacheProxy[ViewModel, Read, Write]) read() (output *ViewModel, err error) { | |
| output, err = proxy.ReadReplica() | |
| if err == nil { | |
| return output, nil | |
| } | |
| output, err = proxy.ReadPrimary() | |
| if err != nil { | |
| return nil, err | |
| } | |
| err = proxy.WriteReplica(output) | |
| if err != nil { | |
| return nil, err | |
| } | |
| return output, nil | |
| } | |
| func (proxy ReadCacheProxy[ViewModel, Read, Write]) ProtectPrimaryNode(key string) (output *ViewModel, err error) { | |
| output, err = proxy.ReadReplica() | |
| if err == nil { | |
| return output, nil | |
| } | |
| return proxy.ProtectAllNode(key) | |
| } | |
| func (proxy ReadCacheProxy[ViewModel, Read, Write]) ProtectAllNode(key string) (output *ViewModel, err error) { | |
| val, err, _ := proxy.SingleFlight.Do(key, proxy.read) | |
| proxy.SingleFlight.Expire(key, time.Second) | |
| // proxy.SingleFlight.Forget(key) | |
| return val, err | |
| } | |
| type WriteCacheProxy struct { | |
| WriteDatabase func() error | |
| DeleteCache func() error | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment