Created
April 3, 2014 04:57
-
-
Save craigmj/9948487 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package webutil | |
import ( | |
"net/http" | |
"time" | |
) | |
// ByteWriter is a one-off utility type for writing to http.ResponseWriter | |
type ByteWriter struct { | |
http.ResponseWriter | |
Err error | |
} | |
func (w *ByteWriter) Write(b ...[]byte) { | |
for _, part := range b { | |
if w.Err != nil { | |
return | |
} | |
_, w.Err = w.ResponseWriter.Write(part) | |
} | |
} | |
// A Sse is a wrapper over a Server-Sent Events request. | |
// | |
// Aside from providing helpers to send events in the SSE format, it also starts | |
// a background goroutine that sends heartbeats to the client on the other side, | |
// in an attempt to detect connection failures. | |
// Users are notified of such failures by listening on the ConnClosed channel. | |
// | |
// Note that users are expected to call Close in order to cleanup the background | |
// goroutine and other resources including time.Tickers when they do not need | |
// the Sse anymore. | |
// Moreover, not calling Close might result in crashes since the background | |
// goroutine calls Sse.Write when sending heartbeats, which in turn calls Flush. | |
// This causes the http package to *panic* since in this case, | |
// http.Flusher.Flush is called after the http handlers return. | |
// https://groups.google.com/d/msg/Golang-Nuts/qcjLQ4O8Pc4/BrgYJF4mENMJ | |
type Sse struct { | |
ByteWriter | |
stopTicker chan bool | |
ConnClosed chan bool | |
} | |
func NewServerSideEventWriter(w http.ResponseWriter, heartbeat string, d time.Duration) Sse { | |
headers := w.Header() | |
headers.Set("Content-Type", "text/event-stream") | |
headers.Set("Cache-Control", "no-cache") | |
headers.Set("Connection", "keep-alive") | |
ticker := time.NewTicker(d) | |
sse := Sse{ByteWriter: ByteWriter{w,nil}, stopTicker: make(chan bool, 1), ConnClosed: make(chan bool, 1)} | |
go func() { | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ticker.C: | |
err := sse.EventWrite(heartbeat, []byte{}) | |
if err != nil { | |
select { | |
case sse.ConnClosed <- true: | |
default: | |
} | |
return | |
} | |
case <-sse.stopTicker: | |
return | |
} | |
} | |
}() | |
return sse | |
} | |
func (sse *Sse) Close() { | |
select { | |
case sse.stopTicker <- true: | |
default: | |
} | |
} | |
func (sse *Sse) Write(b []byte) error { | |
sse.ByteWriter.Write([]byte("data: "), b, []byte("\n\n")) | |
if sse.Err != nil { | |
return sse.Err | |
} | |
if f, ok := sse.ByteWriter.ResponseWriter.(http.Flusher); ok { | |
f.Flush() | |
} | |
return nil | |
} | |
func (sse *Sse) EventWrite(event string, b []byte) error { | |
sse.ByteWriter.Write([]byte("event: "), []byte(event), []byte("\n")) | |
if sse.Err != nil { | |
return sse.Err | |
} | |
return sse.Write(b) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment