Skip to content

Instantly share code, notes, and snippets.

@tylertreat
Last active August 29, 2015 14:11
Show Gist options
  • Save tylertreat/72dd72a5b2ed0251443f to your computer and use it in GitHub Desktop.
Save tylertreat/72dd72a5b2ed0251443f to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"strconv"
"time"
"github.com/gdamore/mangos"
"github.com/gdamore/mangos/protocol/pub"
"github.com/gdamore/mangos/protocol/sub"
"github.com/gdamore/mangos/transport/tcp"
)
func main() {
pubA, err := pub.NewSocket()
if err != nil {
panic(err)
}
pubA.AddTransport(tcp.NewTransport())
if err := pubA.Listen("tcp://:6000"); err != nil {
panic(err)
}
pubB, err := pub.NewSocket()
if err != nil {
panic(err)
}
pubB.AddTransport(tcp.NewTransport())
if err := pubB.Listen("tcp://:7000"); err != nil {
panic(err)
}
subA, err := sub.NewSocket()
if err != nil {
panic(err)
}
subA.AddTransport(tcp.NewTransport())
subA.Dial("tcp://localhost:7000")
subA.SetOption(mangos.OptionSubscribe, []byte{})
subB, err := sub.NewSocket()
if err != nil {
panic(err)
}
subB.AddTransport(tcp.NewTransport())
subB.Dial("tcp://localhost:6000")
subB.SetOption(mangos.OptionSubscribe, []byte{})
time.Sleep(time.Second)
go func() {
go func() {
for i := 0; i < 100; i++ {
if err := pubA.Send([]byte("c: " + strconv.Itoa(i))); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}()
for i := 0; i < 100; i++ {
if err := pubA.Send([]byte("a: " + strconv.Itoa(i))); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}()
go func() {
for i := 0; i < 100; i++ {
if err := pubB.Send([]byte("b: " + strconv.Itoa(i))); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}()
go func() {
for {
msg, _ := subB.Recv()
fmt.Println(string(msg))
}
}()
for {
msg, _ := subA.Recv()
fmt.Println(string(msg))
}
}
@tylertreat
Copy link
Author

It looks like it's because the messages are being dropped here:

https://github.com/gdamore/mangos/blob/master/protocol/pub/pub.go#L78

@tylertreat
Copy link
Author

Turns out what's happening is the messages are being dropped by the publisher because it has an unbuffered channel: https://github.com/gdamore/mangos/blob/aee5ac9cca8ad3f72fd5e5792257ca6d4f593d32/protocol/pub/pub.go#L86

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment