Created
September 20, 2014 00:26
-
-
Save tylertreat-wf/e707f1ba808fb850949d to your computer and use it in GitHub Desktop.
vessel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"time" | |
"github.com/tylertreat-wf/vessel/vessel" | |
) | |
func main() { | |
vessel := vessel.NewSockJSVessel("/vessel") | |
vessel.AddChannel("foo", func(msg string, c chan<- string, done chan<- bool) { | |
c <- "ping" | |
done <- true | |
}) | |
go func() { | |
c := time.Tick(5 * time.Second) | |
for { | |
<-c | |
vessel.Broadcast("baz", "testing 123") | |
} | |
}() | |
vessel.Start(":8081") | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package vessel | |
import ( | |
"fmt" | |
"net/http" | |
"github.com/igm/sockjs-go/sockjs" | |
) | |
type sockjsVessel struct { | |
uri string | |
sessions []sockjs.Session | |
channels map[string]Channel | |
marshaler marshaler | |
} | |
func NewSockJSVessel(uri string) Vessel { | |
return &sockjsVessel{ | |
uri: uri, | |
channels: map[string]Channel{}, | |
sessions: []sockjs.Session{}, | |
marshaler: &jsonMarshaler{}, | |
} | |
} | |
func (v *sockjsVessel) AddChannel(name string, channel Channel) { | |
v.channels[name] = channel | |
} | |
func (v *sockjsVessel) Start(portStr string) error { | |
handler := sockjs.NewHandler(v.uri, sockjs.DefaultOptions, v.handler()) | |
return http.ListenAndServe(portStr, handler) | |
} | |
func (s *sockjsVessel) Broadcast(channel string, msg string) { | |
m := &message{ | |
ID: nextID(), | |
Channel: channel, | |
Body: msg, | |
} | |
for _, session := range s.sessions { | |
if send, err := s.marshaler.marshal(m); err != nil { | |
fmt.Println(err) | |
} else { | |
sendStr := string(send) | |
fmt.Println("Send", sendStr) | |
session.Send(sendStr) | |
} | |
} | |
} | |
func (s *sockjsVessel) handler() func(sockjs.Session) { | |
return func(session sockjs.Session) { | |
s.sessions = append(s.sessions, session) | |
for { | |
msg, err := session.Recv() | |
if err != nil { | |
fmt.Println(err) | |
break | |
} | |
fmt.Println("Recv", msg) | |
recvMsg, err := s.marshaler.unmarshal([]byte(msg)) | |
if err != nil { | |
fmt.Println(err) | |
continue | |
} | |
channelHandler, ok := s.channels[recvMsg.Channel] | |
if !ok { | |
fmt.Println(fmt.Sprintf("No channel registered for %s", recvMsg.Channel)) | |
continue | |
} | |
result := make(chan string) | |
done := make(chan bool) | |
go s.listenForResults(recvMsg.ID, recvMsg.Channel, result, done, session) | |
channelHandler(recvMsg.Body, result, done) | |
} | |
// Remove session from Vessel. | |
for i, sess := range s.sessions { | |
if sess == session { | |
s.sessions = append(s.sessions[:i], s.sessions[i+1:]...) | |
break | |
} | |
} | |
} | |
} | |
func (s *sockjsVessel) listenForResults(id, channel string, c <-chan string, done <-chan bool, session sockjs.Session) { | |
for { | |
select { | |
case <-done: | |
return | |
case result := <-c: | |
sendMsg := &message{ | |
ID: id, | |
Channel: channel, | |
Body: result, | |
} | |
if send, err := s.marshaler.marshal(sendMsg); err != nil { | |
fmt.Println(err) | |
} else { | |
sendStr := string(send) | |
fmt.Println("Send", sendStr) | |
session.Send(sendStr) | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package vessel | |
import ( | |
"encoding/json" | |
"fmt" | |
"strings" | |
"github.com/mattrobenolt/gocql/uuid" | |
) | |
// Channel is a function which takes a message, a channel for sending results, and a channel | |
// for signaling that the handler has completed. | |
type Channel func(string, chan<- string, chan<- bool) | |
// Vessel coordinates communication between clients and server. It's responsible for managing | |
// Channels and processing incoming and outgoing messages. | |
type Vessel interface { | |
AddChannel(string, Channel) | |
Start(string) error | |
Broadcast(string, string) | |
} | |
type message struct { | |
ID string `json:"id"` | |
Channel string `json:"channel"` | |
Body string `json:"body"` | |
} | |
type marshaler interface { | |
unmarshal([]byte) (*message, error) | |
marshal(*message) ([]byte, error) | |
} | |
type jsonMarshaler struct{} | |
func (j *jsonMarshaler) unmarshal(msg []byte) (*message, error) { | |
var payload map[string]interface{} | |
if err := json.Unmarshal(msg, &payload); err != nil { | |
return nil, err | |
} | |
id, ok := payload["id"] | |
if !ok { | |
return nil, fmt.Errorf("Message missing id") | |
} | |
channel, ok := payload["channel"] | |
if !ok { | |
return nil, fmt.Errorf("Message missing channel") | |
} | |
body, ok := payload["body"] | |
if !ok { | |
return nil, fmt.Errorf("Message missing body") | |
} | |
message := &message{ | |
ID: id.(string), | |
Channel: channel.(string), | |
Body: body.(string), | |
} | |
return message, nil | |
} | |
func (j *jsonMarshaler) marshal(message *message) ([]byte, error) { | |
return json.Marshal(message) | |
} | |
func nextID() string { | |
uuid := uuid.RandomUUID().String() | |
return strings.Replace(uuid, "-", "", -1) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment