-
-
Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
// Example SSE server in Golang. | |
// $ go run sse.go | |
// Inspired from https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c | |
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"log" | |
"net/http" | |
"github.com/gorilla/mux" | |
) | |
type Broker struct { | |
// Events are pushed to this channel by the main events-gathering routine | |
Notifier chan []byte | |
// New client connections are pushed to this channel | |
newClients chan chan []byte | |
// Closed client connections are pushed to this channel | |
closingClients chan chan []byte | |
// Client connections registry | |
clients map[chan []byte]bool | |
} | |
func NewServer() (broker *Broker) { | |
// Instantiate a broker | |
broker = &Broker{ | |
Notifier: make(chan []byte, 1), | |
newClients: make(chan chan []byte), | |
closingClients: make(chan chan []byte), | |
clients: make(map[chan []byte]bool), | |
} | |
// Set it running - listening and broadcasting events | |
go broker.listen() | |
return | |
} | |
func (broker *Broker) listen() { | |
for { | |
select { | |
case s := <-broker.newClients: | |
// A new client has connected. | |
// Register their message channel | |
broker.clients[s] = true | |
log.Printf("Client added. %d registered clients", len(broker.clients)) | |
case s := <-broker.closingClients: | |
// A client has dettached and we want to | |
// stop sending them messages. | |
delete(broker.clients, s) | |
log.Printf("Removed client. %d registered clients", len(broker.clients)) | |
case event := <-broker.Notifier: | |
// We got a new event from the outside! | |
// Send event to all connected clients | |
for clientMessageChan := range broker.clients { | |
clientMessageChan <- event | |
} | |
} | |
} | |
} | |
type Message struct { | |
Name string `json:"name"` | |
Message string `json:"msg"` | |
} | |
func (broker *Broker) Stream(w http.ResponseWriter, r *http.Request) { | |
// Check if the ResponseWriter supports flushing. | |
flusher, ok := w.(http.Flusher) | |
if !ok { | |
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) | |
return | |
} | |
// Each connection registers its own message channel with the Broker's connections registry | |
messageChan := make(chan []byte) | |
// Signal the broker that we have a new connection | |
broker.newClients <- messageChan | |
// Remove this client from the map of connected clients | |
// when this handler exits. | |
defer func() { | |
broker.closingClients <- messageChan | |
}() | |
w.Header().Set("Content-Type", "text/event-stream") | |
w.Header().Set("Cache-Control", "no-cache") | |
w.Header().Set("Connection", "keep-alive") | |
w.Header().Set("Access-Control-Allow-Origin", "*") | |
for { | |
select { | |
// Listen to connection close and un-register messageChan | |
case <-r.Context().Done(): | |
// remove this client from the map of connected clients | |
broker.closingClients <- messageChan | |
return | |
// Listen for incoming messages from messageChan | |
case msg := <-messageChan: | |
// Write to the ResponseWriter | |
// Server Sent Events compatible | |
fmt.Fprintf(w, "data: %s\n\n", msg) | |
// Flush the data immediatly instead of buffering it for later. | |
flusher.Flush() | |
} | |
} | |
} | |
func (broker *Broker) BroadcastMessage(w http.ResponseWriter, r *http.Request) { | |
// Parse the request body | |
var msg Message | |
err := json.NewDecoder(r.Body).Decode(&msg) | |
if err != nil { | |
http.Error(w, err.Error(), http.StatusBadRequest) | |
return | |
} | |
// Send the message to the broker via Notifier channel | |
j, err := json.Marshal(msg) | |
if err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
broker.Notifier <- []byte(j) | |
w.WriteHeader(http.StatusCreated) | |
w.Write([]byte("Message sent")) | |
} | |
func main() { | |
broker := NewServer() | |
router := mux.NewRouter() | |
router.HandleFunc("/messages", broker.BroadcastMessage).Methods("POST") | |
router.HandleFunc("/stream", broker.Stream).Methods("GET") | |
log.Println("Starting server on :8000") | |
log.Fatal(http.ListenAndServe(":8000", router)) | |
} | |
// To test the server, run the following commands in separate terminals: | |
// Start listening to the stream | |
// $ curl -N http://localhost:8000/stream | |
// Send a message | |
// $ curl -X POST -H "Content-Type: application/json" -d '{"name": "Alice", "msg": "Hello"}' http://localhost:8000/messages |
Hi @benc-uk thanks, this is actually an old example (also most of it is copied :D) I wanted to update it but forgot time to time (your comment made me aware of this!). We might want to use a sync.Map for the clients registry. And we can have channel example as in, subscribe to a /channel/<name>
and publish to a /channel/<name>
which is more useful example :D
Hey thanks for the implementation @Ananto30 its great!
One thing http.CloseNotifier
from notify := w.(http.CloseNotifier).CloseNotify()
in Broker.Stream()
is deprecated now. Could you update the example to use [Request.Context]
instead?
For me this is the first result when I search for "gorilla mux sse" on google so it would be nice to have this example uptodate.
I would add a MR but I don't know how to do it here.
Here is my suggestion how the updated code could look:
for {
select {
// Listen to connection close and un-register messageChan
case <-r.Context().Done():
// remove this client from the map of connected clients
broker.closingClients <- messageChan
return
// Listen for incoming messages from messageChan
case msg := <-messageChan:
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(w, "data: %s\n\n", msg)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
}
Also I have a question. If we return from the Broker.Stream()
method when the connection is closed and we send the msgChannel
to Broker.ClosingClients
manually, does it make sense to also send it in the defer function? Aren't we sending it twice?
I think either just return when the connection closes and let the defer handle unregistering the client from the broker or unregister the client manually when the connection closes and don't use defer. This is not reflected in my suggestion tho, so if you are going to update it could you also take a look at this?
Have a nice day and thanks for your work!
Hi @MagicMajky thanks a lot for the suggestions, the code is updated! 🙌
Hey, this is absolutely fantastic. Saved me hours (probably days!) of hassle & pain juggling channels and connect closing problems.
Thanks for sharing. Nice clear code too!