Created
December 6, 2018 13:56
-
-
Save trusch/03ba95d6ec68fd11f3bfdc291e5431e9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package events | |
import ( | |
"encoding/json" | |
"io" | |
"net/http" | |
"github.com/joomcode/errorx" | |
"github.com/nats-io/go-nats-streaming" | |
uuid "github.com/satori/go.uuid" | |
"github.com/sirupsen/logrus" | |
"golang.org/x/net/websocket" | |
) | |
// Handler provides an HTTP handler to serve websocket requests to subscribe/unsubscribe to/from event streams | |
type Handler struct { | |
natsClient stan.Conn | |
} | |
// NewHandler creates a new http handler | |
// it takes the address of the nats server and the cluster id of the nats-streaming server as input | |
func NewHandler(addr, clusterID string) (http.Handler, error) { | |
cli, err := stan.Connect(clusterID, uuid.NewV4().String(), stan.NatsURL(addr)) | |
if err != nil { | |
return nil, errorx.Decorate(err, "failed to connect to nats") | |
} | |
return &Handler{cli}, nil | |
} | |
// ServeHTTP implements the http.Handler interface | |
func (handler *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
websocket.Handler(handler.Handle).ServeHTTP(w, r) | |
} | |
// Handle is the function which actually handles the successfully upgraded websocket connection | |
func (handler *Handler) Handle(ws *websocket.Conn) { | |
logrus.Infof("handle new client") | |
defer logrus.Infof("finished handling client") | |
conn := &Connection{ | |
handler, | |
ws, | |
make(map[string]stan.Subscription), | |
make(chan struct{}, 0), | |
} | |
conn.Handle() | |
} | |
// Connection represents a single websocket connection | |
type Connection struct { | |
handler *Handler | |
ws *websocket.Conn | |
subscriptions map[string]stan.Subscription | |
done chan struct{} | |
} | |
// Handle handles a single websocket connection | |
// it continuesly reads requests from the client: | |
// subscribe request: { type: 'suscribe', channel: 'example', seq: 123} | |
// unsubscribe request: { type: 'unsuscribe', channel: 'example'} | |
// if the client is subscribed to a topic the handler sends all events from that | |
// channel starting with the event with the specified sequence id + it will send events as they occur | |
func (c *Connection) Handle() { | |
requestBuf := make([]byte, 4096) | |
for { | |
// read and parse the client request | |
bs, err := c.ws.Read(requestBuf[:]) | |
if err != nil { | |
if err == io.EOF { | |
return | |
} | |
logrus.Error(err) | |
return | |
} | |
req := &clientRequest{} | |
if err = json.Unmarshal(requestBuf[:bs], req); err != nil { | |
logrus.Error(err) | |
return | |
} | |
logrus.Infof("got new client request: %+v", req) | |
if req.Type == "subscribe" { | |
// client want a new subscription | |
if _, ok := c.subscriptions[req.Channel]; ok { | |
logrus.Warnf("already subscribed to %v", req.Channel) | |
continue | |
} | |
// ok, so we setup a subscription to the nats channel and setup a handler | |
// which sends the events to the client | |
s, err := c.handler.natsClient.Subscribe(req.Channel, func(m *stan.Msg) { | |
evt := &event{ | |
Channel: req.Channel, | |
Seq: m.Sequence, | |
} | |
err := json.Unmarshal(m.Data, &evt.Data) | |
if err != nil { | |
// the event data is not valid json, we require that! | |
logrus.Error(err) | |
c.cleanSubscription(req.Channel) | |
c.ws.Close() | |
return | |
} | |
bs, err := json.Marshal(evt) | |
if err != nil { | |
// I dont know how this could happen, but if we fail to marshal the event, we close the websocket | |
logrus.Error(err) | |
c.cleanSubscription(req.Channel) | |
c.ws.Close() | |
return | |
} | |
_, err = c.ws.Write(bs) | |
if err != nil { | |
// it seems like the client closed the connection, so cleanup and exit | |
logrus.Error(err) | |
c.cleanSubscription(req.Channel) | |
c.ws.Close() | |
return | |
} | |
}, stan.StartAtSequence(req.Seq)) | |
if err != nil { | |
// we failed to subscribe to the channel, its most likely that the channel identifier is malformed | |
logrus.Error(err) | |
c.ws.Close() | |
} | |
// lets save the subscription | |
c.subscriptions[req.Channel] = s | |
logrus.Infof("successfully subscribed client from %v", req.Channel) | |
} else { | |
// clients wants to unsubscribe -> close the subscription if any | |
c.cleanSubscription(req.Channel) | |
logrus.Infof("unsubscribed client from %v", req.Channel) | |
} | |
} | |
} | |
// cleanSubscription closes a subscription if any and removes it from the subscriptions list | |
func (c *Connection) cleanSubscription(topic string) { | |
if s, ok := c.subscriptions[topic]; ok { | |
s.Close() | |
delete(c.subscriptions, topic) | |
} | |
} | |
// clientRequest is the structure of the requests the client send over the websocket | |
type clientRequest struct { | |
Type string `json:"type"` | |
Channel string `json:"channel"` | |
Seq uint64 `json:"seq"` | |
} | |
// event is the structure used by the server to report events to the client | |
type event struct { | |
Channel string `json:"channel"` | |
Data map[string]interface{} `json:"data"` | |
Seq uint64 `json:"seq"` | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment