Skip to content

Instantly share code, notes, and snippets.

@adoublef
Last active December 6, 2024 22:23
Show Gist options
  • Save adoublef/67ea6b66e84f1acdfbde8d8ab4bf67de to your computer and use it in GitHub Desktop.
Save adoublef/67ea6b66e84f1acdfbde8d8ab4bf67de to your computer and use it in GitHub Desktop.
server-sent events
// Copyright 2024 Kristopher Rahim Afful-Brown. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package sse
package sse
import (
"bufio"
"context"
"fmt"
"io"
"net/textproto"
"strings"
)
type Event struct {
Key string
Val []byte
Err error
}
func Events(ctx context.Context, r io.Reader) <-chan Event {
return EventsN(ctx, r, 64*(1<<10))
}
func EventsN(ctx context.Context, r io.Reader, limit int64) <-chan Event {
ch := make(chan Event, 1)
go func() {
defer close(ch)
// I want to be able to keep the connection open and read the body
// parse the response
lr := io.LimitReader(r, limit)
br := bufio.NewReader(lr)
tr := textproto.NewReader(br)
for {
select {
case <-ctx.Done():
ch <- Event{Err: ctx.Err()}
return
default:
}
s, err := tr.ReadLine()
if err != nil {
if err != io.EOF {
ch <- Event{Err: err}
}
return // EOF
}
if s == "" { /* new lines needn't be processed */
continue
}
// find first instance of ':'
switch i := strings.Index(s, ":"); i {
case -1:
ch <- Event{Err: fmt.Errorf("malformed line: %q", s)}
continue
default:
typ, s := strings.TrimSpace(s[:i]), strings.TrimSpace(s[i+2:])
ch <- Event{Key: typ, Val: []byte(s)}
}
}
}()
return ch
}
// Copyright 2024 Kristopher Rahim Afful-Brown. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package http
package http
// server-sent events
// read more:
// https://chriswilcox.dev/blog/2024/04/09/Scan-vs-Read-in-bufio.html
// https://nowotarski.info/golang-textproto-reader/
func handleServeSentEvents() http.HandlerFunc {
respond := func(w http.ResponseWriter, format string, v ...any) {
content := fmt.Sprintf(format, v...)
w.Write([]byte(content))
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-strem")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.WriteHeader(200)
respond(w, ": this is a test stream\n\n")
tokens := []string{"this", "is", "a", "server", "sent", "event", "message"}
for _, token := range tokens {
respond(w, "data: %s\n\n", string(token))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment