Last active
March 12, 2020 09:50
-
-
Save blixt/494ddedb96abb3ad60b936713de0d434 to your computer and use it in GitHub Desktop.
Dedupe multiple concurrent calls to the same function and return same result to all callers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
) | |
// Dedupe will combine a bunch of concurrent calls with the same key (string) into just one. | |
// Example: | |
func slowlyAdd(a, b int) int { | |
key := fmt.Sprintf("%d+%d", a, b) | |
v, _ := Dedupe(key, func() (interface{}, error) { | |
// Here we put the slow code that we don't mind only running once. | |
time.Sleep(2 * time.Second) | |
return a + b, nil | |
}) | |
return v.(int) | |
} | |
func main() { | |
fmt.Println("Please wait while we slowly add the same numbers together a few times!") | |
start := time.Now() | |
var wg sync.WaitGroup | |
for i := 0; i < 5; i++ { | |
// Run five slowlyAdd calls in parallel (separate goroutines). | |
wg.Add(1) | |
go func() { | |
fmt.Printf("Adding 1 + 2: %d\n", slowlyAdd(1, 2)) | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
fmt.Printf("Total time taken: %s\n", time.Since(start)) | |
} | |
// Dedupe implementation: | |
type result struct { | |
value interface{} | |
err error | |
} | |
var ( | |
dedupeMap = make(map[string][]chan<- result) | |
dedupeMu sync.Mutex | |
) | |
func Dedupe(key string, perform func() (interface{}, error)) (interface{}, error) { | |
// Check if there's already an ongoing call. | |
dedupeMu.Lock() | |
if calls, ok := dedupeMap[key]; ok { | |
// There is an ongoing call, join the list of waiting requests. | |
// Buffered to allow the result to be written before it is read. | |
ch := make(chan result, 1) | |
dedupeMap[key] = append(calls, ch) | |
dedupeMu.Unlock() | |
r := <-ch | |
return r.value, r.err | |
} | |
// There is no other call ongoing for the provided key. | |
dedupeMap[key] = []chan<- result{} | |
dedupeMu.Unlock() | |
value, err := perform() | |
// Get all waiting requests. | |
dedupeMu.Lock() | |
calls := dedupeMap[key] | |
delete(dedupeMap, key) | |
dedupeMu.Unlock() | |
// Fulfill the waiting requests. | |
for _, ch := range calls { | |
ch <- result{value, err} | |
} | |
// Return the result. | |
return value, err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment