Last active
June 2, 2020 15:08
-
-
Save artyom/5ec1cf2ad45b2b31d869df1ef0f7aee6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module radix-bug | |
go 1.14 | |
require ( | |
github.com/mediocregopher/radix/v3 v3.5.1 | |
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | |
github.com/mediocregopher/radix/v3 v3.5.1 h1:IOYgQUMA380N4khaL5eNT4v/P2LnHa8b0wnVdwZMFsY= | |
github.com/mediocregopher/radix/v3 v3.5.1/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= | |
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | |
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= | |
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o= | |
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= | |
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"flag" | |
"fmt" | |
"log" | |
"os" | |
"path/filepath" | |
"runtime/debug" | |
"sync/atomic" | |
"time" | |
"github.com/mediocregopher/radix/v3" | |
"golang.org/x/sync/errgroup" | |
) | |
func main() { | |
debug.SetTraceback("all") | |
flag.Parse() | |
if err := run(flag.Arg(0)); err != nil { | |
os.Stderr.WriteString(err.Error() + "\n") | |
os.Exit(1) | |
} | |
} | |
func run(addr string) error { | |
if addr == "" { | |
return fmt.Errorf("usage: %s host:port", filepath.Base(os.Args[0])) | |
} | |
const name = "some-channel" | |
var drift int64 | |
group, ctx := errgroup.WithContext(context.Background()) | |
group.Go(func() error { | |
pool, err := radix.NewPool("tcp", addr, 4) | |
if err != nil { | |
return err | |
} | |
defer pool.Close() | |
ticker := time.NewTicker(10 * time.Millisecond) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ticker.C: | |
if err := pool.Do(radix.Cmd(nil, "PUBLISH", name, "payload")); err != nil { | |
return err | |
} | |
atomic.AddInt64(&drift, 1) | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
}) | |
group.Go(func() error { | |
rc, err := radix.Dial("tcp", addr, | |
radix.DialReadTimeout(time.Second/2), | |
radix.DialWriteTimeout(time.Second/2), | |
radix.DialConnectTimeout(time.Second/2)) | |
if err != nil { | |
return err | |
} | |
defer rc.Close() | |
conn := radix.PubSub(rc) | |
defer conn.Close() | |
ch := make(chan radix.PubSubMessage) | |
if err := conn.Subscribe(ch, name); err != nil { | |
return err | |
} | |
ticker := time.NewTicker(5 * time.Second) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case <-ticker.C: | |
if err := conn.Ping(); err != nil { | |
return err | |
} | |
case _, ok := <-ch: | |
atomic.AddInt64(&drift, -1) | |
if !ok { | |
return errors.New("zero value receive") | |
} | |
} | |
} | |
}) | |
group.Go(func() error { | |
ticker := time.NewTicker(2 * time.Second) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case <-ticker.C: | |
d := atomic.LoadInt64(&drift) | |
log.Println("send vs receive drift", d) | |
if d > 1000 { | |
panic("drift is too big, receiver likely stuck") | |
} | |
} | |
} | |
}) | |
return group.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment