Last active
October 22, 2022 23:21
-
-
Save elliotchance/7db9eb40a935f5526b62f0bcf77b7d2c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"strings" | |
"sync" | |
"time" | |
) | |
type ChannelPerf struct { | |
ch chan interface{} | |
sent, received int64 | |
sendBlockingDuration time.Duration | |
receiveBlockingDuration time.Duration | |
start time.Time | |
timeLock sync.Mutex | |
name string | |
} | |
func NewChannelPerf(size int, name string) *ChannelPerf { | |
return &ChannelPerf{ | |
ch: make(chan interface{}, size), | |
start: time.Now(), | |
name: name, | |
} | |
} | |
func (cf *ChannelPerf) Receive() (obj interface{}, received bool) { | |
start := time.Now() | |
obj, received = <-cf.ch | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
cf.receiveBlockingDuration += time.Now().Sub(start) | |
if received { | |
cf.received++ | |
} | |
return | |
} | |
func (cf *ChannelPerf) Send(obj interface{}) { | |
start := time.Now() | |
cf.ch <- obj | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
cf.sendBlockingDuration += time.Now().Sub(start) | |
cf.sent++ | |
} | |
func (cf *ChannelPerf) Close() { | |
close(cf.ch) | |
} | |
func (cf *ChannelPerf) Reset() { | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
cf.sendBlockingDuration = 0 | |
cf.receiveBlockingDuration = 0 | |
cf.start = time.Now() | |
cf.sent = 0 | |
cf.received = 0 | |
} | |
func (cf *ChannelPerf) String() string { | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
return fmt.Sprintf("%s (clock: %v, sent: %d in %v, received: %d in %v)", | |
cf.name, time.Now().Sub(cf.start), | |
cf.sent, cf.sendBlockingDuration, | |
cf.received, cf.receiveBlockingDuration) | |
} | |
func (cf *ChannelPerf) ReceiveFor(expire time.Duration) (obj interface{}, received bool, timedOut bool) { | |
start := time.Now() | |
select { | |
case obj, received = <-cf.ch: | |
case <-time.After(expire): | |
timedOut = true | |
} | |
cf.timeLock.Lock() | |
defer cf.timeLock.Unlock() | |
cf.receiveBlockingDuration += time.Now().Sub(start) | |
if received { | |
cf.received++ | |
} | |
return | |
} | |
func (cf *ChannelPerf) Channel() chan interface{} { | |
return cf.ch | |
} | |
type ChannelPerfGroup struct { | |
channels []*ChannelPerf | |
channelsLock sync.Mutex | |
} | |
func NewChannelPerfGroup() *ChannelPerfGroup { | |
return &ChannelPerfGroup{} | |
} | |
func (g *ChannelPerfGroup) NewChannelPerf(size int, name string) *ChannelPerf { | |
channel := NewChannelPerf(size, name) | |
g.channelsLock.Lock() | |
defer g.channelsLock.Unlock() | |
g.channels = append(g.channels, channel) | |
return channel | |
} | |
func (g *ChannelPerfGroup) String() string { | |
g.channelsLock.Lock() | |
defer g.channelsLock.Unlock() | |
var lines []string | |
for _, channel := range g.channels { | |
lines = append(lines, channel.String()) | |
} | |
return strings.Join(lines, "; ") | |
} | |
func (g *ChannelPerfGroup) Reset() { | |
g.channelsLock.Lock() | |
defer g.channelsLock.Unlock() | |
for _, channel := range g.channels { | |
channel.Reset() | |
} | |
} | |
func (g *ChannelPerfGroup) StartTicker(every time.Duration, reset bool, fn func()) { | |
go func() { | |
for range time.NewTicker(every).C { | |
fn() | |
if reset { | |
g.Reset() | |
} | |
} | |
}() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment