Skip to content

Instantly share code, notes, and snippets.

@craigmj
Created April 3, 2014 04:57
Show Gist options
  • Save craigmj/9948487 to your computer and use it in GitHub Desktop.
Save craigmj/9948487 to your computer and use it in GitHub Desktop.
package webutil
import (
"net/http"
"time"
)
// ByteWriter is a one-off utility type for writing to http.ResponseWriter
type ByteWriter struct {
http.ResponseWriter
Err error
}
func (w *ByteWriter) Write(b ...[]byte) {
for _, part := range b {
if w.Err != nil {
return
}
_, w.Err = w.ResponseWriter.Write(part)
}
}
// A Sse is a wrapper over a Server-Sent Events request.
//
// Aside from providing helpers to send events in the SSE format, it also starts
// a background goroutine that sends heartbeats to the client on the other side,
// in an attempt to detect connection failures.
// Users are notified of such failures by listening on the ConnClosed channel.
//
// Note that users are expected to call Close in order to cleanup the background
// goroutine and other resources including time.Tickers when they do not need
// the Sse anymore.
// Moreover, not calling Close might result in crashes since the background
// goroutine calls Sse.Write when sending heartbeats, which in turn calls Flush.
// This causes the http package to *panic* since in this case,
// http.Flusher.Flush is called after the http handlers return.
// https://groups.google.com/d/msg/Golang-Nuts/qcjLQ4O8Pc4/BrgYJF4mENMJ
type Sse struct {
ByteWriter
stopTicker chan bool
ConnClosed chan bool
}
func NewServerSideEventWriter(w http.ResponseWriter, heartbeat string, d time.Duration) Sse {
headers := w.Header()
headers.Set("Content-Type", "text/event-stream")
headers.Set("Cache-Control", "no-cache")
headers.Set("Connection", "keep-alive")
ticker := time.NewTicker(d)
sse := Sse{ByteWriter: ByteWriter{w,nil}, stopTicker: make(chan bool, 1), ConnClosed: make(chan bool, 1)}
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := sse.EventWrite(heartbeat, []byte{})
if err != nil {
select {
case sse.ConnClosed <- true:
default:
}
return
}
case <-sse.stopTicker:
return
}
}
}()
return sse
}
func (sse *Sse) Close() {
select {
case sse.stopTicker <- true:
default:
}
}
func (sse *Sse) Write(b []byte) error {
sse.ByteWriter.Write([]byte("data: "), b, []byte("\n\n"))
if sse.Err != nil {
return sse.Err
}
if f, ok := sse.ByteWriter.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
return nil
}
func (sse *Sse) EventWrite(event string, b []byte) error {
sse.ByteWriter.Write([]byte("event: "), []byte(event), []byte("\n"))
if sse.Err != nil {
return sse.Err
}
return sse.Write(b)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment