Skip to content

Instantly share code, notes, and snippets.

@FZambia
Last active January 20, 2021 11:21
Show Gist options
  • Save FZambia/89b62d656325b704799d to your computer and use it in GitHub Desktop.
Save FZambia/89b62d656325b704799d to your computer and use it in GitHub Desktop.
Redis PUB/SUB vs RethinkDB PUB/SUB
package main
import (
"github.com/garyburd/redigo/redis"
)
func main() {
subConn, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
panic(err)
}
defer subConn.Close()
subscriber := redis.PubSubConn{subConn}
subscriber.Subscribe("channel1")
count := 0
done := make(chan struct{})
go func() {
for {
switch subscriber.Receive().(type) {
case redis.Message:
count++
if count%1000 == 0 {
println(count)
}
if count == 100000 {
close(done)
}
case redis.Subscription:
case error:
return
}
}
}()
numPublishers := 1
for i := 0; i < numPublishers; i++ {
conn, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
panic(err)
}
defer conn.Close()
go func(c redis.Conn) {
for {
_, err = c.Do("PUBLISH", "channel1", "opa")
if err != nil {
println(err.Error())
}
}
}(conn)
}
<-done
}
package main
import (
"strings"
r "github.com/dancannon/gorethink"
)
type Exchange struct {
asserted bool
name string
table r.Term
session *r.Session
subs map[string]chan struct{}
}
func NewExchange(name string, session *r.Session) *Exchange {
return &Exchange{
name: name,
table: r.Table(name),
session: session,
subs: make(map[string]chan struct{}),
}
}
func (e *Exchange) Publish(topicKey string, payload string) error {
resp, err := e.table.Filter(map[string]interface{}{
"topic": topicKey,
}).Update(map[string]interface{}{
"topic": topicKey, "updated_on": r.Now(), "payload": payload,
}).RunWrite(e.session)
if err != nil {
return err
}
if resp.Replaced == 0 {
err := e.table.Insert(map[string]interface{}{
"topic": topicKey, "payload": payload, "updated_on": r.Now(),
}).Exec(e.session, r.ExecOpts{NoReply: true})
if err != nil {
return err
}
}
return nil
}
func (e *Exchange) fullQuery(matchRegexp string) r.Term {
return e.table.Changes().Field("new_val").Filter(func(row r.Term) r.Term {
return row.Field("topic").Match(matchRegexp)
})
}
func (e *Exchange) assertTable() error {
if e.asserted {
return nil
}
_, err := r.TableCreate(e.name, r.TableCreateOpts{
Durability: "soft",
}).RunWrite(e.session)
if err != nil && !strings.Contains(err.Error(), "already exists") {
return err
}
e.asserted = true
return nil
}
type Message struct {
Topic string
Payload string
}
func (e *Exchange) Subscribe(topicKey string, match string, ch chan Message) error {
err := e.assertTable()
if err != nil {
return err
}
messages, err := e.fullQuery(match).Run(e.session)
if err != nil {
return err
}
closeCh := make(chan struct{})
e.subs[topicKey] = closeCh
go func() {
defer messages.Close()
var message Message
for messages.Next(&message) {
select {
case ch <- message:
case <-closeCh:
return
}
}
}()
return nil
}
func (e *Exchange) Unsubscribe(topicKey string) {
if ch, ok := e.subs[topicKey]; ok {
close(ch)
}
}
func main() {
ch := make(chan Message, 4096)
var session *r.Session
session, err := r.Connect(r.ConnectOpts{
Address: "localhost:28015",
})
if err != nil {
panic(err)
}
exch := NewExchange("subscribe_demo", session)
err = exch.Subscribe("channel1", "channel1", ch)
if err != nil {
panic(err)
}
count := 0
done := make(chan struct{})
go func() {
for {
select {
case msg := <-ch:
if msg.Topic == "" {
panic("empty topic")
}
count++
if count%1000 == 0 {
println(count)
}
if count == 100000 {
close(done)
}
}
}
}()
numPublishers := 1
for i := 0; i < numPublishers; i++ {
pubSession, err := r.Connect(r.ConnectOpts{
Address: "localhost:28015",
})
if err != nil {
panic(err)
}
pubExch := NewExchange("subscribe_demo", pubSession)
go func(ex *Exchange) {
for {
err := ex.Publish("channel1", "opa1")
if err != nil {
println(err)
}
}
}(pubExch)
}
<-done
}
@abserari
Copy link

abserari commented Jan 20, 2021

Appreciate your gist! Could you comment few words with the difference between the two database user experiences?

@FZambia
Copy link
Author

FZambia commented Jan 20, 2021

@abserari hello, thanks! That was a long time ago when I wrote this – so I don't remember the actual difference in performance here. Rethinkdb code left as a prototype - so I never used RethinkDB in production and can't say about the difference from operational perspective too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment