Last active
January 20, 2021 11:21
-
-
Save FZambia/89b62d656325b704799d to your computer and use it in GitHub Desktop.
Redis PUB/SUB vs RethinkDB PUB/SUB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/garyburd/redigo/redis" | |
) | |
func main() { | |
subConn, err := redis.Dial("tcp", "127.0.0.1:6379") | |
if err != nil { | |
panic(err) | |
} | |
defer subConn.Close() | |
subscriber := redis.PubSubConn{subConn} | |
subscriber.Subscribe("channel1") | |
count := 0 | |
done := make(chan struct{}) | |
go func() { | |
for { | |
switch subscriber.Receive().(type) { | |
case redis.Message: | |
count++ | |
if count%1000 == 0 { | |
println(count) | |
} | |
if count == 100000 { | |
close(done) | |
} | |
case redis.Subscription: | |
case error: | |
return | |
} | |
} | |
}() | |
numPublishers := 1 | |
for i := 0; i < numPublishers; i++ { | |
conn, err := redis.Dial("tcp", "127.0.0.1:6379") | |
if err != nil { | |
panic(err) | |
} | |
defer conn.Close() | |
go func(c redis.Conn) { | |
for { | |
_, err = c.Do("PUBLISH", "channel1", "opa") | |
if err != nil { | |
println(err.Error()) | |
} | |
} | |
}(conn) | |
} | |
<-done | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"strings" | |
r "github.com/dancannon/gorethink" | |
) | |
type Exchange struct { | |
asserted bool | |
name string | |
table r.Term | |
session *r.Session | |
subs map[string]chan struct{} | |
} | |
func NewExchange(name string, session *r.Session) *Exchange { | |
return &Exchange{ | |
name: name, | |
table: r.Table(name), | |
session: session, | |
subs: make(map[string]chan struct{}), | |
} | |
} | |
func (e *Exchange) Publish(topicKey string, payload string) error { | |
resp, err := e.table.Filter(map[string]interface{}{ | |
"topic": topicKey, | |
}).Update(map[string]interface{}{ | |
"topic": topicKey, "updated_on": r.Now(), "payload": payload, | |
}).RunWrite(e.session) | |
if err != nil { | |
return err | |
} | |
if resp.Replaced == 0 { | |
err := e.table.Insert(map[string]interface{}{ | |
"topic": topicKey, "payload": payload, "updated_on": r.Now(), | |
}).Exec(e.session, r.ExecOpts{NoReply: true}) | |
if err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
func (e *Exchange) fullQuery(matchRegexp string) r.Term { | |
return e.table.Changes().Field("new_val").Filter(func(row r.Term) r.Term { | |
return row.Field("topic").Match(matchRegexp) | |
}) | |
} | |
func (e *Exchange) assertTable() error { | |
if e.asserted { | |
return nil | |
} | |
_, err := r.TableCreate(e.name, r.TableCreateOpts{ | |
Durability: "soft", | |
}).RunWrite(e.session) | |
if err != nil && !strings.Contains(err.Error(), "already exists") { | |
return err | |
} | |
e.asserted = true | |
return nil | |
} | |
type Message struct { | |
Topic string | |
Payload string | |
} | |
func (e *Exchange) Subscribe(topicKey string, match string, ch chan Message) error { | |
err := e.assertTable() | |
if err != nil { | |
return err | |
} | |
messages, err := e.fullQuery(match).Run(e.session) | |
if err != nil { | |
return err | |
} | |
closeCh := make(chan struct{}) | |
e.subs[topicKey] = closeCh | |
go func() { | |
defer messages.Close() | |
var message Message | |
for messages.Next(&message) { | |
select { | |
case ch <- message: | |
case <-closeCh: | |
return | |
} | |
} | |
}() | |
return nil | |
} | |
func (e *Exchange) Unsubscribe(topicKey string) { | |
if ch, ok := e.subs[topicKey]; ok { | |
close(ch) | |
} | |
} | |
func main() { | |
ch := make(chan Message, 4096) | |
var session *r.Session | |
session, err := r.Connect(r.ConnectOpts{ | |
Address: "localhost:28015", | |
}) | |
if err != nil { | |
panic(err) | |
} | |
exch := NewExchange("subscribe_demo", session) | |
err = exch.Subscribe("channel1", "channel1", ch) | |
if err != nil { | |
panic(err) | |
} | |
count := 0 | |
done := make(chan struct{}) | |
go func() { | |
for { | |
select { | |
case msg := <-ch: | |
if msg.Topic == "" { | |
panic("empty topic") | |
} | |
count++ | |
if count%1000 == 0 { | |
println(count) | |
} | |
if count == 100000 { | |
close(done) | |
} | |
} | |
} | |
}() | |
numPublishers := 1 | |
for i := 0; i < numPublishers; i++ { | |
pubSession, err := r.Connect(r.ConnectOpts{ | |
Address: "localhost:28015", | |
}) | |
if err != nil { | |
panic(err) | |
} | |
pubExch := NewExchange("subscribe_demo", pubSession) | |
go func(ex *Exchange) { | |
for { | |
err := ex.Publish("channel1", "opa1") | |
if err != nil { | |
println(err) | |
} | |
} | |
}(pubExch) | |
} | |
<-done | |
} |
@abserari hello, thanks! That was a long time ago when I wrote this – so I don't remember the actual difference in performance here. Rethinkdb code left as a prototype - so I never used RethinkDB in production and can't say about the difference from operational perspective too.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Appreciate your gist! Could you comment few words with the difference between the two database user experiences?