-
-
Save pontiyaraja/29130d6e0a033034a1c545cffc926fdd to your computer and use it in GitHub Desktop.
Example of using Redis PubSub and EventSource with golang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
eventsource "github.com/antage/eventsource/http" | |
redis "github.com/vmihailenco/redis" | |
"log" | |
"net/http" | |
) | |
func haltOnErr(err error){ | |
if err != nil { panic(err) } | |
} | |
type subscriptionHandler struct { | |
index map[string]subscription | |
} | |
// maps a url to a redis Subscribe channel | |
type subscription struct { | |
pubsub * redis.PubSubClient | |
es eventsource.EventSource | |
ch chan * redis.Message | |
pubChan string | |
} | |
func createSubscription(sh * subscriptionHandler, pubChan * string){ | |
log.Printf("creating channel %s",*pubChan) | |
pubsub, err := redis.NewTCPClient(":6379","",-1).PubSubClient() | |
haltOnErr(err) | |
ch, err := pubsub.Subscribe(*pubChan) | |
haltOnErr(err) | |
es := eventsource.New(nil) | |
sh.index[*pubChan] = subscription{pubsub, es, ch, *pubChan} | |
} | |
// listen for published events and send to the EventSource | |
func listen(index subscription){ | |
for { | |
msg := <-index.ch | |
index.es.SendMessage(msg.Message, "", "") | |
log.Printf("message has been sent on %s (consumers: %d)", index.pubChan, index.es.ConsumersCount()) | |
} | |
} | |
func (sh *subscriptionHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request){ | |
pubChan := req.URL.Path[1:] | |
_, ok := sh.index[pubChan] | |
if ! ok { | |
createSubscription(sh, &pubChan) | |
defer sh.index[pubChan].pubsub.Close() | |
defer sh.index[pubChan].es.Close() | |
} | |
log.Printf("subscribed to %s",pubChan) | |
go listen(sh.index[pubChan]) | |
sh.index[pubChan].es.ServeHTTP(resp,req) | |
} | |
func main() { | |
streamer := new(subscriptionHandler) | |
streamer.index = make(map[string]subscription) | |
http.Handle("/events/", streamer) | |
err := http.ListenAndServe(":8080", nil) | |
haltOnErr(err) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://github.com/Yaffle/EventSource | |
function listen(channel){ | |
var source; | |
if (!!window.EventSource) { | |
source = new EventSource('/events/'+channel); | |
} else { | |
// Result to xhr polling :( | |
} | |
source.addEventListener('message', function(e) { | |
console.log(e.data); | |
}, false); | |
source.addEventListener('open', function(e) { | |
console.log("opened channel on "+channel) | |
}, false); | |
source.addEventListener('error', function(e) { | |
if (e.readyState == EventSource.CLOSED) { | |
console.log("closed channel on "+channel) | |
} | |
}, false); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment