Skip to content

Instantly share code, notes, and snippets.

@tylertreat-wf
Created September 20, 2014 00:26
Show Gist options
  • Save tylertreat-wf/e707f1ba808fb850949d to your computer and use it in GitHub Desktop.
Save tylertreat-wf/e707f1ba808fb850949d to your computer and use it in GitHub Desktop.
vessel
package main
import (
"time"
"github.com/tylertreat-wf/vessel/vessel"
)
func main() {
vessel := vessel.NewSockJSVessel("/vessel")
vessel.AddChannel("foo", func(msg string, c chan<- string, done chan<- bool) {
c <- "ping"
done <- true
})
go func() {
c := time.Tick(5 * time.Second)
for {
<-c
vessel.Broadcast("baz", "testing 123")
}
}()
vessel.Start(":8081")
}
package vessel
import (
"fmt"
"net/http"
"github.com/igm/sockjs-go/sockjs"
)
type sockjsVessel struct {
uri string
sessions []sockjs.Session
channels map[string]Channel
marshaler marshaler
}
func NewSockJSVessel(uri string) Vessel {
return &sockjsVessel{
uri: uri,
channels: map[string]Channel{},
sessions: []sockjs.Session{},
marshaler: &jsonMarshaler{},
}
}
func (v *sockjsVessel) AddChannel(name string, channel Channel) {
v.channels[name] = channel
}
func (v *sockjsVessel) Start(portStr string) error {
handler := sockjs.NewHandler(v.uri, sockjs.DefaultOptions, v.handler())
return http.ListenAndServe(portStr, handler)
}
func (s *sockjsVessel) Broadcast(channel string, msg string) {
m := &message{
ID: nextID(),
Channel: channel,
Body: msg,
}
for _, session := range s.sessions {
if send, err := s.marshaler.marshal(m); err != nil {
fmt.Println(err)
} else {
sendStr := string(send)
fmt.Println("Send", sendStr)
session.Send(sendStr)
}
}
}
func (s *sockjsVessel) handler() func(sockjs.Session) {
return func(session sockjs.Session) {
s.sessions = append(s.sessions, session)
for {
msg, err := session.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println("Recv", msg)
recvMsg, err := s.marshaler.unmarshal([]byte(msg))
if err != nil {
fmt.Println(err)
continue
}
channelHandler, ok := s.channels[recvMsg.Channel]
if !ok {
fmt.Println(fmt.Sprintf("No channel registered for %s", recvMsg.Channel))
continue
}
result := make(chan string)
done := make(chan bool)
go s.listenForResults(recvMsg.ID, recvMsg.Channel, result, done, session)
channelHandler(recvMsg.Body, result, done)
}
// Remove session from Vessel.
for i, sess := range s.sessions {
if sess == session {
s.sessions = append(s.sessions[:i], s.sessions[i+1:]...)
break
}
}
}
}
func (s *sockjsVessel) listenForResults(id, channel string, c <-chan string, done <-chan bool, session sockjs.Session) {
for {
select {
case <-done:
return
case result := <-c:
sendMsg := &message{
ID: id,
Channel: channel,
Body: result,
}
if send, err := s.marshaler.marshal(sendMsg); err != nil {
fmt.Println(err)
} else {
sendStr := string(send)
fmt.Println("Send", sendStr)
session.Send(sendStr)
}
}
}
}
package vessel
import (
"encoding/json"
"fmt"
"strings"
"github.com/mattrobenolt/gocql/uuid"
)
// Channel is a function which takes a message, a channel for sending results, and a channel
// for signaling that the handler has completed.
type Channel func(string, chan<- string, chan<- bool)
// Vessel coordinates communication between clients and server. It's responsible for managing
// Channels and processing incoming and outgoing messages.
type Vessel interface {
AddChannel(string, Channel)
Start(string) error
Broadcast(string, string)
}
type message struct {
ID string `json:"id"`
Channel string `json:"channel"`
Body string `json:"body"`
}
type marshaler interface {
unmarshal([]byte) (*message, error)
marshal(*message) ([]byte, error)
}
type jsonMarshaler struct{}
func (j *jsonMarshaler) unmarshal(msg []byte) (*message, error) {
var payload map[string]interface{}
if err := json.Unmarshal(msg, &payload); err != nil {
return nil, err
}
id, ok := payload["id"]
if !ok {
return nil, fmt.Errorf("Message missing id")
}
channel, ok := payload["channel"]
if !ok {
return nil, fmt.Errorf("Message missing channel")
}
body, ok := payload["body"]
if !ok {
return nil, fmt.Errorf("Message missing body")
}
message := &message{
ID: id.(string),
Channel: channel.(string),
Body: body.(string),
}
return message, nil
}
func (j *jsonMarshaler) marshal(message *message) ([]byte, error) {
return json.Marshal(message)
}
func nextID() string {
uuid := uuid.RandomUUID().String()
return strings.Replace(uuid, "-", "", -1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment