Created
July 26, 2023 16:02
-
-
Save pattanun-np/1a6b7b962047915fc0413e2ad39469b7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package chat | |
import ( | |
"bytes" | |
"fmt" | |
"log" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"github.com/gofiber/contrib/websocket" | |
"github.com/nats-io/nats.go" | |
) | |
const ( | |
// Time allowed to write a message to the peer. | |
writeWait = 25 * time.Second | |
// Time allowed to read the next pong message from the peer. | |
pongWait = 120 * time.Second | |
// Send pings to peer with this period. Must be less than pongWait. | |
pingPeriod = (pongWait * 9) / 10 | |
// Maximum message size allowed from peer. | |
maxMessageSize = 512 | |
) | |
var ( | |
newline = []byte{'\n'} | |
space = []byte{' '} | |
) | |
func (ch *ChatHandler) FindRoomORCreate(roomName string) *Room { | |
// Retrieve the room or create a new one if it doesn't exist | |
// | |
log.Println("FindRoomORCreate: ", roomName) | |
if room, ok := ch.rooms[roomName]; ok { | |
return room | |
} | |
room := &Room{ | |
name: roomName, | |
clients: make(map[*websocket.Conn]bool), | |
addCh: make(chan *websocket.Conn), | |
delCh: make(chan *websocket.Conn), | |
sendCh: make(chan []byte), | |
doneCh: make(chan struct{}), | |
} | |
ch.rooms[roomName] = room | |
go room.run(ch) | |
return room | |
} | |
func (room *Room) run(ch *ChatHandler) { | |
// Create a ticker to ping clients every 50 seconds. | |
for { | |
select { | |
case client := <-room.addCh: // Add new client | |
room.clients[client] = true | |
case client := <-room.delCh: // Remove client from the room | |
delete(room.clients, client) | |
if len(room.clients) == 0 { | |
// If the room is empty, remove it from ChatHandler | |
delete(ch.rooms, room.name) // Broadcast the close signal | |
close(room.doneCh) // Broadcast the close signal | |
return | |
} | |
case message := <-room.sendCh: | |
if len(room.clients) > 0 { | |
for client := range room.clients { | |
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) | |
// Send the message to all clients in the room | |
if err := client.WriteMessage(websocket.TextMessage, message); err != nil { | |
log.Println("Error broadcasting message:", err) | |
} | |
} | |
} | |
// Broadcast message to all clients | |
} | |
} | |
} | |
func (ch *ChatHandler) FindRoom(roomName string) *Room { | |
log.Println("FindRoom: ", roomName) | |
// Retrieve the room or create a new one if it doesn't exist | |
// | |
room, ok := ch.rooms[roomName] | |
if !ok { | |
log.Println("Room not found live chat is disabled") | |
return nil | |
} | |
log.Println("Room found: ", roomName) | |
return room | |
} | |
func (ch *ChatHandler) StreamInLiveChat(c *websocket.Conn) { | |
// Check is roomID exist | |
// If existed, emit to all clients | |
// If not exist, create new roomID and emit to all clients | |
userId := c.Params("user_id") | |
botID := c.Params("bot_id") | |
// Continuously read messages from the connection | |
roomName := fmt.Sprintf("room_%s_botid_%s", userId, botID) | |
log.Println("RoomName: ", roomName) | |
room := ch.FindRoomORCreate(roomName) | |
ch.addConnection(room, c) // Add connection to the room | |
go func(room *Room, c *websocket.Conn) { | |
ch.Ping(room, c) | |
}(room, c) | |
ch.ReadPump(roomName, c) | |
} | |
func (ch *ChatHandler) StreamOutLiveChat(c *websocket.Conn) { | |
// Check is roomID exist | |
// If existed, emit to all clients | |
// If not exist, create new roomID and emit to all clients | |
userId := c.Params("user_id") | |
botID := c.Params("bot_id") | |
// Continuously read messages from the connection | |
outRoom := fmt.Sprintf("out_room_%s_botid_%s", userId, botID) | |
log.Println("RoomName: ", outRoom) | |
room := ch.FindRoomORCreate(outRoom) | |
ch.addConnection(room, c) // Add connection to the room | |
go func(room *Room, c *websocket.Conn) { | |
ch.Ping(room, c) | |
}(room, c) | |
ch.ReadPump(outRoom, c) | |
} | |
func (ch *ChatHandler) NotificationChat(c *websocket.Conn) { | |
// Check is roomID exist | |
// If existed, emit to all clients | |
// If not exist, create new roomID and emit to all clients | |
roomName := "notification" | |
room := ch.FindRoomORCreate(roomName) | |
ch.addConnection(room, c) | |
go func(room *Room, c *websocket.Conn) { | |
ch.Ping(room, c) | |
}(room, c) | |
ch.ReadPump(roomName, c) | |
} | |
func (ch *ChatHandler) Ping(room *Room, c *websocket.Conn) { | |
// Start a timer to send ping every 5 seconds | |
// Start a goroutine to ping the client every 5 seconds | |
pingTicker := time.NewTicker(5 * time.Second) | |
defer pingTicker.Stop() | |
for range pingTicker.C { // Loop until the timer channel receives a value | |
if len(room.clients) > 0 { | |
if err := c.WriteMessage(websocket.PingMessage, []byte("")); err != nil { | |
log.Println(err) | |
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { | |
log.Printf("error: %v", err) | |
break | |
} | |
ch.removeConnection(room, c) | |
break | |
} | |
} | |
} | |
pingTicker.Stop() | |
// // Read messages from the client | |
} | |
// ReadPump listens for incoming messages from the WebSocket connection | |
func (ch *ChatHandler) ReadPump(subject string, c *websocket.Conn) { | |
log.Println("New connection added") | |
// Keep the main goroutine running | |
// Start a timer to send ping every 5 seconds | |
c.SetReadLimit(maxMessageSize) | |
err := c.SetReadDeadline(time.Now().Add(pongWait)) | |
if err != nil { | |
return | |
} | |
c.SetPongHandler(func(string) error { | |
err := c.SetReadDeadline(time.Now().Add(pongWait)) | |
if err != nil { | |
return err | |
} | |
return nil | |
}) | |
// Subscribe to a subject/topic | |
room := ch.FindRoom(subject) | |
sub, _ := ch.repository.NATSRepository.Subscribe(subject, func(msg *nats.Msg) { | |
message := bytes.TrimSpace(bytes.Replace(msg.Data, newline, space, -1)) | |
if err := c.WriteMessage(websocket.TextMessage, message); err != nil { | |
log.Println(err) | |
ch.removeConnection(room, c) | |
return | |
} | |
}) | |
// Read messages from NATS and write to the socket | |
defer func(sub *nats.Subscription) { | |
err := sub.Unsubscribe() | |
if err != nil { | |
return | |
} | |
}(sub) | |
// // Set up a signal handler to gracefully exit the program | |
sigCh := make(chan os.Signal, 1) | |
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) // Register os.Interrupt and syscall.SIGTERM to sigCh | |
go func(room *Room, c *websocket.Conn) { | |
<-sigCh | |
err := sub.Unsubscribe() // Unsubscribe from the subject | |
if err != nil { | |
return | |
} | |
ch.repository.NATSRepository.Close() // Close the connection to NATS | |
ch.removeConnection(room, c) | |
os.Exit(0) | |
// Exit the program | |
}(room, c) | |
// Publish message to the subject | |
// | |
select {} | |
} | |
func (ch *ChatHandler) addConnection(room *Room, c *websocket.Conn) { | |
log.Println("Add connection to room: ", room.name) | |
log.Println("Add connection to room: ", c.RemoteAddr().String()) | |
// Add the new client to the room by sending it over the addCh channel | |
room.addCh <- c // Send the client to the addCh channel | |
} | |
func (ch *ChatHandler) removeConnection(room *Room, c *websocket.Conn) { | |
// Remove the client from the room by sending it over the delCh channel | |
room.delCh <- c // Send the client to the delCh channel | |
} | |
func (ch *ChatHandler) broadcast(room *Room, message []byte) { | |
// Broadcast message to all clients | |
log.Println("Broadcast message: ") | |
// log.Println(utils.PrettyFormatStruct(string(message))) | |
// Send the message to all the clients in the room by sending it over the sendCh channel | |
room.sendCh <- message // Send the message to the sendCh channel | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment