Skip to content

Instantly share code, notes, and snippets.

@Ananto30
Last active September 5, 2024 23:04
Show Gist options
  • Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
SSE message stream in Go
// Example SSE server in Golang.
// $ go run sse.go
// Inspired from https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections are pushed to this channel
newClients chan chan []byte
// Closed client connections are pushed to this channel
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
}
func NewServer() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
clientMessageChan <- event
}
}
}
}
type Message struct {
Name string `json:"name"`
Message string `json:"msg"`
}
func (broker *Broker) Stream(w http.ResponseWriter, r *http.Request) {
// Check if the ResponseWriter supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
for {
select {
// Listen to connection close and un-register messageChan
case <-r.Context().Done():
// remove this client from the map of connected clients
broker.closingClients <- messageChan
return
// Listen for incoming messages from messageChan
case msg := <-messageChan:
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(w, "data: %s\n\n", msg)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
}
}
func (broker *Broker) BroadcastMessage(w http.ResponseWriter, r *http.Request) {
// Parse the request body
var msg Message
err := json.NewDecoder(r.Body).Decode(&msg)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Send the message to the broker via Notifier channel
j, err := json.Marshal(msg)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
broker.Notifier <- []byte(j)
w.WriteHeader(http.StatusCreated)
w.Write([]byte("Message sent"))
}
func main() {
broker := NewServer()
router := mux.NewRouter()
router.HandleFunc("/messages", broker.BroadcastMessage).Methods("POST")
router.HandleFunc("/stream", broker.Stream).Methods("GET")
log.Println("Starting server on :8000")
log.Fatal(http.ListenAndServe(":8000", router))
}
// To test the server, run the following commands in separate terminals:
// Start listening to the stream
// $ curl -N http://localhost:8000/stream
// Send a message
// $ curl -X POST -H "Content-Type: application/json" -d '{"name": "Alice", "msg": "Hello"}' http://localhost:8000/messages
@Ananto30
Copy link
Author

Ananto30 commented Nov 4, 2023

Hi @benc-uk thanks, this is actually an old example (also most of it is copied :D) I wanted to update it but forgot time to time (your comment made me aware of this!). We might want to use a sync.Map for the clients registry. And we can have channel example as in, subscribe to a /channel/<name> and publish to a /channel/<name> which is more useful example :D

@MagicMajky
Copy link

MagicMajky commented Apr 26, 2024

Hey thanks for the implementation @Ananto30 its great!

One thing http.CloseNotifier from notify := w.(http.CloseNotifier).CloseNotify() in Broker.Stream() is deprecated now. Could you update the example to use [Request.Context] instead?

For me this is the first result when I search for "gorilla mux sse" on google so it would be nice to have this example uptodate.

I would add a MR but I don't know how to do it here.
Here is my suggestion how the updated code could look:

for {
    select {
    // Listen to connection close and un-register messageChan
    case <-r.Context().Done():
        // remove this client from the map of connected clients
        broker.closingClients <- messageChan
        return
	
    // Listen for incoming messages from messageChan
    case msg := <-messageChan:
        // Write to the ResponseWriter
        // Server Sent Events compatible
        fmt.Fprintf(w, "data: %s\n\n", msg)
        // Flush the data immediatly instead of buffering it for later.
        flusher.Flush()
    }
}

Also I have a question. If we return from the Broker.Stream() method when the connection is closed and we send the msgChannel to Broker.ClosingClients manually, does it make sense to also send it in the defer function? Aren't we sending it twice?
I think either just return when the connection closes and let the defer handle unregistering the client from the broker or unregister the client manually when the connection closes and don't use defer. This is not reflected in my suggestion tho, so if you are going to update it could you also take a look at this?

Have a nice day and thanks for your work!

@Ananto30
Copy link
Author

Ananto30 commented May 5, 2024

Hi @MagicMajky thanks a lot for the suggestions, the code is updated! 🙌

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment