Created
November 25, 2024 11:07
-
-
Save pmbanugo/7537e824ee2ce07a4507e5f204e0f872 to your computer and use it in GitHub Desktop.
Broadcast multiple websocket client/connection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
// "fmt" | |
"strings" | |
"sync" | |
"ergo.services/ergo/act" | |
"ergo.services/ergo/gen" | |
"ergo.services/meta/websocket" | |
) | |
func factory_MyWebSocketWorker() gen.ProcessBehavior { | |
return &MyWebSocketWorker{} | |
} | |
type MyWebSocketWorker struct { | |
act.Actor | |
//TODO: should this be a map of event names to a slice of aliases? | |
subscriptions map[string]map[string]gen.Alias | |
mu sync.RWMutex | |
} | |
// Init invoked on a start this process. | |
func (w *MyWebSocketWorker) Init(args ...any) error { | |
w.Log().Info("started WebSocket worker process %s with args %v", w.Name(), args) | |
w.subscriptions = make(map[string]map[string]gen.Alias) | |
return nil | |
} | |
var eventList = &sync.Map{} | |
func (w *MyWebSocketWorker) HandleMessage(from gen.PID, message any) error { | |
switch m := message.(type) { | |
case websocket.MessageConnect: | |
w.Log().Info("%s new websocket connection with %s, meta-process %s", w.Name(), m.RemoteAddr.String(), m.ID) | |
reply := websocket.Message{ | |
Body: []byte("hello from " + w.PID().String()), | |
} | |
w.SendAlias(m.ID, reply) | |
case websocket.MessageDisconnect: | |
w.Log().Info("%s disconnected with %s", w.Name(), m.ID) | |
// TODO: remove subscription for this WebSocket connection meta-process. | |
case websocket.Message: | |
received := string(m.Body) | |
strip := strings.TrimRight(received, "\r\n") | |
w.Log().Info("%s got message (meta-process: %s): %s", w.Name(), m.ID, strip) | |
// send echo reply | |
// reply := fmt.Sprintf("OK %s", strip) | |
// m.Body = []byte(reply) | |
// w.SendAlias(m.ID, m) | |
//extract the type of event from the message so we know if to subscribe or publish. it's in the format of: "publish:eventname:message" or "subscribe:eventname" | |
parts := strings.Split(strip, ":") | |
w.Log().Info("parts: %s %s", parts, len(parts)) | |
if len(parts) == 3 && parts[0] == "publish" { | |
w.Log().Info("i'm in publish mode") | |
if token, exists := eventList.Load(parts[1]); exists { | |
w.Log().Info("publishing event: %s", parts[1]) | |
m.Body = []byte(m.ID.String() + ": " + parts[2]) | |
err := w.SendEvent(gen.Atom(parts[1]), *(token.(*gen.Ref)), m) | |
if err != nil { | |
w.Log().Info("unable to publish event: %s", err) | |
m.Body = []byte("unable to publish event") | |
w.SendAlias(m.ID, m) | |
return nil | |
} | |
} else { | |
w.Log().Info("no subscribers for event: %s", parts[1]) | |
} | |
} else if len(parts) == 2 && parts[0] == "subscribe" { | |
w.Log().Info("subscribing to event: %s", parts[1]) | |
evtAtom := gen.Atom(parts[1]) | |
//check if the event is already registered | |
_, exists := eventList.Load(parts[1]) | |
if !exists { | |
w.Log().Info("registering event for: %s", parts[1]) | |
//register the event to the Node. | |
token, err := w.Node().RegisterEvent(evtAtom, gen.EventOptions{Buffer: 10}) | |
if err != nil { | |
w.Log().Info("unable to register event for: %s", err) | |
m.Body = []byte("unable to subscribe to event") | |
w.SendAlias(m.ID, m) | |
return nil | |
} | |
w.Log().Info("registered event for: %s, token: %s", parts[1], token) | |
eventList.Store(parts[1], &token) | |
} | |
//link/subscribe to the event if it's not in the subscriptions map | |
w.mu.RLock() | |
w.Log().Debug("subscriptions: %s", w.subscriptions) | |
if _, ok := w.subscriptions[parts[1]][m.ID.String()]; !ok { | |
lastEvents, err := w.LinkEvent(gen.Event{Name: evtAtom}) | |
if err != nil { | |
w.Log().Info("%s: unable to link event for: %s. %s", m.ID, evtAtom, err) | |
m.Body = []byte("unable to subscribe to event") | |
w.SendAlias(m.ID, m) | |
return nil | |
} | |
w.Log().Info("last events: %s", lastEvents) | |
} | |
w.mu.RUnlock() | |
// add the alias to the subscription map | |
w.mu.Lock() | |
if _, ok := w.subscriptions[parts[1]]; !ok { | |
w.subscriptions[parts[1]] = make(map[string]gen.Alias) | |
} | |
w.subscriptions[parts[1]][m.ID.String()] = m.ID | |
w.mu.Unlock() | |
//inform the client that we subscribed to the event | |
m.Body = []byte("subscribed to event: " + parts[1]) | |
w.SendAlias(m.ID, m) | |
w.Log().Info("%s subscribed to event with %s", m.ID, parts[1]) | |
//send the last events to the client | |
// for _, event := range lastEvents { | |
// w.Log().Info("processing last event: %s", event) | |
// // m.Body = []byte() | |
// w.SendAlias(m.ID, event.Message) | |
// } | |
} | |
default: | |
w.Log().Error("uknown message from %s %#v", from, message) | |
} | |
return nil | |
} | |
func (w *MyWebSocketWorker) HandleEvent(event gen.MessageEvent) error { | |
w.Log().Info("received event %s with value: %#v", event.Event, event.Message) | |
switch m := event.Message.(type) { | |
case websocket.Message: | |
//TODO: send event to all subscribers | |
for _, alias := range w.subscriptions[event.Event.String()] { | |
w.SendAlias(alias, m) | |
} | |
default: | |
w.Log().Error("uknown message type from %s %#v", event.Event, event.Message) | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment