Skip to content

Instantly share code, notes, and snippets.

@jweir
Last active April 19, 2021 03:08
Show Gist options
  • Select an option

  • Save jweir/4528042 to your computer and use it in GitHub Desktop.

Select an option

Save jweir/4528042 to your computer and use it in GitHub Desktop.
Example of using Redis PubSub and EventSource with golang
package main
import (
eventsource "github.com/antage/eventsource/http"
redis "github.com/vmihailenco/redis"
"log"
"net/http"
)
func haltOnErr(err error){
if err != nil { panic(err) }
}
type subscriptionHandler struct {
index map[string]subscription
}
// maps a url to a redis Subscribe channel
type subscription struct {
pubsub * redis.PubSubClient
es eventsource.EventSource
ch chan * redis.Message
pubChan string
}
func createSubscription(sh * subscriptionHandler, pubChan * string){
log.Printf("creating channel %s",*pubChan)
pubsub, err := redis.NewTCPClient(":6379","",-1).PubSubClient()
haltOnErr(err)
ch, err := pubsub.Subscribe(*pubChan)
haltOnErr(err)
es := eventsource.New(nil)
sh.index[*pubChan] = subscription{pubsub, es, ch, *pubChan}
}
// listen for published events and send to the EventSource
func listen(index subscription){
for {
msg := <-index.ch
index.es.SendMessage(msg.Message, "", "")
log.Printf("message has been sent on %s (consumers: %d)", index.pubChan, index.es.ConsumersCount())
}
}
func (sh *subscriptionHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request){
pubChan := req.URL.Path[1:]
_, ok := sh.index[pubChan]
if ! ok {
createSubscription(sh, &pubChan)
defer sh.index[pubChan].pubsub.Close()
defer sh.index[pubChan].es.Close()
}
log.Printf("subscribed to %s",pubChan)
go listen(sh.index[pubChan])
sh.index[pubChan].es.ServeHTTP(resp,req)
}
func main() {
streamer := new(subscriptionHandler)
streamer.index = make(map[string]subscription)
http.Handle("/events/", streamer)
err := http.ListenAndServe(":8080", nil)
haltOnErr(err)
}
// https://github.com/Yaffle/EventSource
function listen(channel){
var source;
if (!!window.EventSource) {
source = new EventSource('/events/'+channel);
} else {
// Result to xhr polling :(
}
source.addEventListener('message', function(e) {
console.log(e.data);
}, false);
source.addEventListener('open', function(e) {
console.log("opened channel on "+channel)
}, false);
source.addEventListener('error', function(e) {
if (e.readyState == EventSource.CLOSED) {
console.log("closed channel on "+channel)
}
}, false);
}
@druska
Copy link
Copy Markdown

druska commented Dec 6, 2013

There is no need to pass immutable strings as pointers. You can also use redis to handle the subscriptions instead of a global variable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment