Skip to content

Instantly share code, notes, and snippets.

@JeffreyVdb
Created June 28, 2015 09:41
Show Gist options
  • Save JeffreyVdb/8f593f6de42e332b0cee to your computer and use it in GitHub Desktop.
Save JeffreyVdb/8f593f6de42e332b0cee to your computer and use it in GitHub Desktop.
async map go
package main
import (
"fmt"
"hash/fnv"
"sync"
)
const (
shardCount = 32
)
// Array to pointers of syncRequestMap
type requestMapShards []*syncRequestMap
// Synchronized map for requests
type syncRequestMap struct {
requests map[string][]chan []byte
sync.RWMutex
}
// Create array of concurrent request maps
func newRequestMapShards(initialCapacity int) (m requestMapShards) {
m = make(requestMapShards, shardCount)
for i := 0; i < shardCount; i++ {
m[i] = &syncRequestMap{requests: make(map[string][]chan []byte)}
}
return
}
// Get shared under a given key
func (m requestMapShards) GetShard(key string) *syncRequestMap {
hasher := fnv.New32()
hasher.Write([]byte(key))
return m[uint(hasher.Sum32())%uint(shardCount)]
}
// Concurrently add a channel to the array at given request
func (m *requestMapShards) Add(key string, channel chan []byte) (newChan bool) {
shard := m.GetShard(key)
shard.Lock()
defer shard.Unlock()
reqs := shard.requests[key]
if cap(reqs) == 0 {
reqs = make([]chan []byte, 0, 10)
newChan = true
}
shard.requests[key] = append(reqs, channel)
return
}
// Concurrently get array of channels at given key
func (m requestMapShards) Get(key string) (chans []chan []byte, ok bool) {
shard := m.GetShard(key)
shard.RLock()
defer shard.RUnlock()
chans, ok = shard.requests[key]
return
}
// Return the number of waiting clients
func (m requestMapShards) NumWaiting() (cnt int) {
for i := 0; i < shardCount; i++ {
shard := m[i]
shard.RLock()
for _, req := range shard.requests {
cnt += len(req)
}
shard.RUnlock()
}
return
}
// Delete given key from requests
func (m *requestMapShards) Remove(key string) {
shard := m.GetShard(key)
shard.Lock()
defer shard.Unlock()
delete(shard.requests, key)
}
// Feed every waiting channel a given value
func (m requestMapShards) FeedValue(key string, data []byte) {
go func() {
shard := m.GetShard(key)
shard.Lock()
chans := shard.requests[key]
for _, ch := range chans {
ch <- data
}
// Delete the request
fmt.Println("Removing request:", key)
delete(shard.requests, key)
shard.Unlock()
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment