Skip to content

Instantly share code, notes, and snippets.

@Ananto30
Last active September 5, 2024 23:04
Show Gist options
  • Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
SSE message stream in Go
// Example SSE server in Golang.
// $ go run sse.go
// Inspired from https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections are pushed to this channel
newClients chan chan []byte
// Closed client connections are pushed to this channel
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
}
func NewServer() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
clientMessageChan <- event
}
}
}
}
type Message struct {
Name string `json:"name"`
Message string `json:"msg"`
}
func (broker *Broker) Stream(w http.ResponseWriter, r *http.Request) {
// Check if the ResponseWriter supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
for {
select {
// Listen to connection close and un-register messageChan
case <-r.Context().Done():
// remove this client from the map of connected clients
broker.closingClients <- messageChan
return
// Listen for incoming messages from messageChan
case msg := <-messageChan:
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(w, "data: %s\n\n", msg)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
}
}
func (broker *Broker) BroadcastMessage(w http.ResponseWriter, r *http.Request) {
// Parse the request body
var msg Message
err := json.NewDecoder(r.Body).Decode(&msg)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Send the message to the broker via Notifier channel
j, err := json.Marshal(msg)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
broker.Notifier <- []byte(j)
w.WriteHeader(http.StatusCreated)
w.Write([]byte("Message sent"))
}
func main() {
broker := NewServer()
router := mux.NewRouter()
router.HandleFunc("/messages", broker.BroadcastMessage).Methods("POST")
router.HandleFunc("/stream", broker.Stream).Methods("GET")
log.Println("Starting server on :8000")
log.Fatal(http.ListenAndServe(":8000", router))
}
// To test the server, run the following commands in separate terminals:
// Start listening to the stream
// $ curl -N http://localhost:8000/stream
// Send a message
// $ curl -X POST -H "Content-Type: application/json" -d '{"name": "Alice", "msg": "Hello"}' http://localhost:8000/messages
@Ananto30
Copy link
Author

Ananto30 commented May 5, 2024

Hi @MagicMajky thanks a lot for the suggestions, the code is updated! 🙌

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment