Skip to content

Instantly share code, notes, and snippets.

@pmbanugo
Created November 25, 2024 11:07
Show Gist options
  • Save pmbanugo/7537e824ee2ce07a4507e5f204e0f872 to your computer and use it in GitHub Desktop.
Save pmbanugo/7537e824ee2ce07a4507e5f204e0f872 to your computer and use it in GitHub Desktop.
Broadcast multiple websocket client/connection
package main
import (
// "fmt"
"strings"
"sync"
"ergo.services/ergo/act"
"ergo.services/ergo/gen"
"ergo.services/meta/websocket"
)
func factory_MyWebSocketWorker() gen.ProcessBehavior {
return &MyWebSocketWorker{}
}
type MyWebSocketWorker struct {
act.Actor
//TODO: should this be a map of event names to a slice of aliases?
subscriptions map[string]map[string]gen.Alias
mu sync.RWMutex
}
// Init invoked on a start this process.
func (w *MyWebSocketWorker) Init(args ...any) error {
w.Log().Info("started WebSocket worker process %s with args %v", w.Name(), args)
w.subscriptions = make(map[string]map[string]gen.Alias)
return nil
}
var eventList = &sync.Map{}
func (w *MyWebSocketWorker) HandleMessage(from gen.PID, message any) error {
switch m := message.(type) {
case websocket.MessageConnect:
w.Log().Info("%s new websocket connection with %s, meta-process %s", w.Name(), m.RemoteAddr.String(), m.ID)
reply := websocket.Message{
Body: []byte("hello from " + w.PID().String()),
}
w.SendAlias(m.ID, reply)
case websocket.MessageDisconnect:
w.Log().Info("%s disconnected with %s", w.Name(), m.ID)
// TODO: remove subscription for this WebSocket connection meta-process.
case websocket.Message:
received := string(m.Body)
strip := strings.TrimRight(received, "\r\n")
w.Log().Info("%s got message (meta-process: %s): %s", w.Name(), m.ID, strip)
// send echo reply
// reply := fmt.Sprintf("OK %s", strip)
// m.Body = []byte(reply)
// w.SendAlias(m.ID, m)
//extract the type of event from the message so we know if to subscribe or publish. it's in the format of: "publish:eventname:message" or "subscribe:eventname"
parts := strings.Split(strip, ":")
w.Log().Info("parts: %s %s", parts, len(parts))
if len(parts) == 3 && parts[0] == "publish" {
w.Log().Info("i'm in publish mode")
if token, exists := eventList.Load(parts[1]); exists {
w.Log().Info("publishing event: %s", parts[1])
m.Body = []byte(m.ID.String() + ": " + parts[2])
err := w.SendEvent(gen.Atom(parts[1]), *(token.(*gen.Ref)), m)
if err != nil {
w.Log().Info("unable to publish event: %s", err)
m.Body = []byte("unable to publish event")
w.SendAlias(m.ID, m)
return nil
}
} else {
w.Log().Info("no subscribers for event: %s", parts[1])
}
} else if len(parts) == 2 && parts[0] == "subscribe" {
w.Log().Info("subscribing to event: %s", parts[1])
evtAtom := gen.Atom(parts[1])
//check if the event is already registered
_, exists := eventList.Load(parts[1])
if !exists {
w.Log().Info("registering event for: %s", parts[1])
//register the event to the Node.
token, err := w.Node().RegisterEvent(evtAtom, gen.EventOptions{Buffer: 10})
if err != nil {
w.Log().Info("unable to register event for: %s", err)
m.Body = []byte("unable to subscribe to event")
w.SendAlias(m.ID, m)
return nil
}
w.Log().Info("registered event for: %s, token: %s", parts[1], token)
eventList.Store(parts[1], &token)
}
//link/subscribe to the event if it's not in the subscriptions map
w.mu.RLock()
w.Log().Debug("subscriptions: %s", w.subscriptions)
if _, ok := w.subscriptions[parts[1]][m.ID.String()]; !ok {
lastEvents, err := w.LinkEvent(gen.Event{Name: evtAtom})
if err != nil {
w.Log().Info("%s: unable to link event for: %s. %s", m.ID, evtAtom, err)
m.Body = []byte("unable to subscribe to event")
w.SendAlias(m.ID, m)
return nil
}
w.Log().Info("last events: %s", lastEvents)
}
w.mu.RUnlock()
// add the alias to the subscription map
w.mu.Lock()
if _, ok := w.subscriptions[parts[1]]; !ok {
w.subscriptions[parts[1]] = make(map[string]gen.Alias)
}
w.subscriptions[parts[1]][m.ID.String()] = m.ID
w.mu.Unlock()
//inform the client that we subscribed to the event
m.Body = []byte("subscribed to event: " + parts[1])
w.SendAlias(m.ID, m)
w.Log().Info("%s subscribed to event with %s", m.ID, parts[1])
//send the last events to the client
// for _, event := range lastEvents {
// w.Log().Info("processing last event: %s", event)
// // m.Body = []byte()
// w.SendAlias(m.ID, event.Message)
// }
}
default:
w.Log().Error("uknown message from %s %#v", from, message)
}
return nil
}
func (w *MyWebSocketWorker) HandleEvent(event gen.MessageEvent) error {
w.Log().Info("received event %s with value: %#v", event.Event, event.Message)
switch m := event.Message.(type) {
case websocket.Message:
//TODO: send event to all subscribers
for _, alias := range w.subscriptions[event.Event.String()] {
w.SendAlias(alias, m)
}
default:
w.Log().Error("uknown message type from %s %#v", event.Event, event.Message)
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment