Skip to content

Instantly share code, notes, and snippets.

@shunsukeaihara
Last active December 20, 2018 09:09
Show Gist options
  • Save shunsukeaihara/5cab353b4bd75b614b4c to your computer and use it in GitHub Desktop.
Save shunsukeaihara/5cab353b4bd75b614b4c to your computer and use it in GitHub Desktop.
long poll chat server in go and redis
// chatserver project main.go
package main
import (
"encoding/json"
"flag"
"fmt"
"github.com/golang/glog"
"github.com/zenazn/goji"
"github.com/zenazn/goji/web"
"gopkg.in/redis.v2"
"net/http"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
)
const pub_channel = "pub"
type JsonResponse struct {
Status string
Msgs []string
}
type State struct {
mutex sync.Mutex
rooms map[string][]chan bool
}
func (s *State) AppendChan(key string, ch chan bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
if room, ok := s.rooms[key]; ok {
s.rooms[key] = append(room, ch)
} else {
s.rooms[key] = []chan bool{ch}
}
}
func (s *State) GetAndResetChan(key string) []chan bool {
s.mutex.Lock()
defer s.mutex.Unlock()
if room, ok := s.rooms[key]; ok {
s.rooms[key] = []chan bool{}
return room
} else {
return []chan bool{}
}
}
func (s *State) RereaseAll() {
s.mutex.Lock()
defer s.mutex.Unlock()
for key, room := range s.rooms {
for _, ch := range room {
ch <- true
}
delete(s.rooms, key)
}
}
var global_state = State{rooms: map[string][]chan bool{}}
var client *redis.Client
func init_client() {
client = redis.NewTCPClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
}
func responseSuccess(w http.ResponseWriter) {
bdata, err := json.Marshal(JsonResponse{Status: "success"})
if err == nil {
fmt.Fprint(w, string(bdata))
}
}
func responseError(w http.ResponseWriter, msg string) {
bdata, err := json.Marshal(JsonResponse{Status: "error", Msgs: []string{msg}})
if err == nil {
fmt.Fprint(w, string(bdata))
}
}
func postMessage(c web.C, w http.ResponseWriter, r *http.Request) {
roomid := c.URLParams["roomid"]
userid := c.URLParams["uid"]
is_joined, err := client.SIsMember(roomid, userid).Result()
if (err != nil) || (is_joined != true) {
responseError(w, "you did not join this room")
return
}
post_msg := r.FormValue("msg")
glog.Info(post_msg)
users, err := client.SMembers(roomid).Result()
if err != nil {
responseError(w, "could not send message")
return
}
_, err2 := client.Pipelined(func(c *redis.Pipeline) error {
//save massage for joined users
for _, joined := range users {
key := roomid + ":" + joined
glog.Info(key)
c.RPush(key, post_msg)
}
return nil
})
if err2 != nil {
responseError(w, "Could not post message")
return
}
//publish roomid
pub := client.Publish(pub_channel, roomid)
_ = pub.Err()
glog.Infof("%s publish %s", userid, roomid)
responseSuccess(w)
}
func getMsgFromRedis(roomid string, userid string) []string {
var msgs *redis.StringSliceCmd
_, err := client.Pipelined(func(c *redis.Pipeline) error {
key := roomid + ":" + userid
msgs = c.LRange(key, 0, -1)
c.Del(key)
return nil
})
if err == nil {
glog.Info(msgs)
return msgs.Val()
} else {
return []string{}
}
}
func getMessage(c web.C, w http.ResponseWriter, r *http.Request) {
roomid := c.URLParams["roomid"]
userid := c.URLParams["uid"]
is_joined, err := client.SIsMember(roomid, userid).Result()
if (err != nil) || (is_joined != true) {
responseError(w, "you did not join this room")
return
}
glog.Infof("%s waiting in room %s", userid, roomid)
msgs := getMsgFromRedis(roomid, userid)
var ret JsonResponse
if len(msgs) == 0 {
ch := make(chan bool)
global_state.AppendChan(roomid, ch)
// waiting post massage for this room
<-ch
// read room messages from redis
msgs := getMsgFromRedis(roomid, userid)
glog.Info(msgs)
ret = JsonResponse{Msgs: msgs, Status: "success"}
} else {
// jsonify
ret = JsonResponse{Msgs: msgs, Status: "success"}
}
glog.Info(ret)
bdata, err := json.Marshal(ret)
if err == nil {
fmt.Fprint(w, string(bdata))
}
}
func joinRoom(c web.C, w http.ResponseWriter, r *http.Request) {
roomid := c.URLParams["roomid"]
userid := c.URLParams["uid"]
glog.Infof("join %s to %s", userid, roomid)
client.SAdd(roomid, userid)
responseSuccess(w)
}
func leaveRoom(c web.C, w http.ResponseWriter, r *http.Request) {
roomid := c.URLParams["roomid"]
userid := c.URLParams["uid"]
glog.Infof("leave %s from %s", userid, roomid)
client.SRem(roomid, userid)
client.Del(roomid + ":" + userid)
responseSuccess(w)
}
func receive(pubsub *redis.PubSub) {
for {
msg, err := pubsub.Receive()
if err == nil {
switch t := msg.(type) {
default:
glog.Infof("unexpected type %T", t)
case *redis.Message:
glog.Infof("mssage %T", t)
roomid := msg.(*redis.Message).Payload
glog.Infof("receive message for %s", roomid)
room := global_state.GetAndResetChan(roomid)
for _, ch := range room {
ch <- true
}
case *redis.Subscription:
glog.Infof("subscription %T", t)
count := msg.(*redis.Subscription).Count
if count == 0 {
break
}
}
}
}
}
func subscribe(signal_chan chan os.Signal) {
// waiting pubsub
pubsub := client.PubSub()
defer pubsub.Close()
err := pubsub.Subscribe(pub_channel)
if err != nil {
panic(err)
}
go receive(pubsub)
<-signal_chan
//shutdown
global_state.RereaseAll()
pubsub.PUnsubscribe("*")
}
func main() {
flag.Parse()
cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
glog.Infof("cpu num: %d", cpus)
init_client()
signal_chan := make(chan os.Signal, 1)
signal.Notify(signal_chan,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
go subscribe(signal_chan)
goji.Get("/room/:roomid/join/:uid", joinRoom)
goji.Get("/room/:roomid/leave/:uid", leaveRoom)
goji.Post("/room/:roomid/post/:uid", postMessage)
goji.Get("/room/:roomid/get/:uid", getMessage)
goji.Serve()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment