Last active
December 20, 2018 09:09
-
-
Save shunsukeaihara/5cab353b4bd75b614b4c to your computer and use it in GitHub Desktop.
long poll chat server in go and redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// chatserver project main.go | |
package main | |
import ( | |
"encoding/json" | |
"flag" | |
"fmt" | |
"github.com/golang/glog" | |
"github.com/zenazn/goji" | |
"github.com/zenazn/goji/web" | |
"gopkg.in/redis.v2" | |
"net/http" | |
"os" | |
"os/signal" | |
"runtime" | |
"sync" | |
"syscall" | |
) | |
const pub_channel = "pub" | |
type JsonResponse struct { | |
Status string | |
Msgs []string | |
} | |
type State struct { | |
mutex sync.Mutex | |
rooms map[string][]chan bool | |
} | |
func (s *State) AppendChan(key string, ch chan bool) { | |
s.mutex.Lock() | |
defer s.mutex.Unlock() | |
if room, ok := s.rooms[key]; ok { | |
s.rooms[key] = append(room, ch) | |
} else { | |
s.rooms[key] = []chan bool{ch} | |
} | |
} | |
func (s *State) GetAndResetChan(key string) []chan bool { | |
s.mutex.Lock() | |
defer s.mutex.Unlock() | |
if room, ok := s.rooms[key]; ok { | |
s.rooms[key] = []chan bool{} | |
return room | |
} else { | |
return []chan bool{} | |
} | |
} | |
func (s *State) RereaseAll() { | |
s.mutex.Lock() | |
defer s.mutex.Unlock() | |
for key, room := range s.rooms { | |
for _, ch := range room { | |
ch <- true | |
} | |
delete(s.rooms, key) | |
} | |
} | |
var global_state = State{rooms: map[string][]chan bool{}} | |
var client *redis.Client | |
func init_client() { | |
client = redis.NewTCPClient(&redis.Options{ | |
Addr: "127.0.0.1:6379", | |
}) | |
} | |
func responseSuccess(w http.ResponseWriter) { | |
bdata, err := json.Marshal(JsonResponse{Status: "success"}) | |
if err == nil { | |
fmt.Fprint(w, string(bdata)) | |
} | |
} | |
func responseError(w http.ResponseWriter, msg string) { | |
bdata, err := json.Marshal(JsonResponse{Status: "error", Msgs: []string{msg}}) | |
if err == nil { | |
fmt.Fprint(w, string(bdata)) | |
} | |
} | |
func postMessage(c web.C, w http.ResponseWriter, r *http.Request) { | |
roomid := c.URLParams["roomid"] | |
userid := c.URLParams["uid"] | |
is_joined, err := client.SIsMember(roomid, userid).Result() | |
if (err != nil) || (is_joined != true) { | |
responseError(w, "you did not join this room") | |
return | |
} | |
post_msg := r.FormValue("msg") | |
glog.Info(post_msg) | |
users, err := client.SMembers(roomid).Result() | |
if err != nil { | |
responseError(w, "could not send message") | |
return | |
} | |
_, err2 := client.Pipelined(func(c *redis.Pipeline) error { | |
//save massage for joined users | |
for _, joined := range users { | |
key := roomid + ":" + joined | |
glog.Info(key) | |
c.RPush(key, post_msg) | |
} | |
return nil | |
}) | |
if err2 != nil { | |
responseError(w, "Could not post message") | |
return | |
} | |
//publish roomid | |
pub := client.Publish(pub_channel, roomid) | |
_ = pub.Err() | |
glog.Infof("%s publish %s", userid, roomid) | |
responseSuccess(w) | |
} | |
func getMsgFromRedis(roomid string, userid string) []string { | |
var msgs *redis.StringSliceCmd | |
_, err := client.Pipelined(func(c *redis.Pipeline) error { | |
key := roomid + ":" + userid | |
msgs = c.LRange(key, 0, -1) | |
c.Del(key) | |
return nil | |
}) | |
if err == nil { | |
glog.Info(msgs) | |
return msgs.Val() | |
} else { | |
return []string{} | |
} | |
} | |
func getMessage(c web.C, w http.ResponseWriter, r *http.Request) { | |
roomid := c.URLParams["roomid"] | |
userid := c.URLParams["uid"] | |
is_joined, err := client.SIsMember(roomid, userid).Result() | |
if (err != nil) || (is_joined != true) { | |
responseError(w, "you did not join this room") | |
return | |
} | |
glog.Infof("%s waiting in room %s", userid, roomid) | |
msgs := getMsgFromRedis(roomid, userid) | |
var ret JsonResponse | |
if len(msgs) == 0 { | |
ch := make(chan bool) | |
global_state.AppendChan(roomid, ch) | |
// waiting post massage for this room | |
<-ch | |
// read room messages from redis | |
msgs := getMsgFromRedis(roomid, userid) | |
glog.Info(msgs) | |
ret = JsonResponse{Msgs: msgs, Status: "success"} | |
} else { | |
// jsonify | |
ret = JsonResponse{Msgs: msgs, Status: "success"} | |
} | |
glog.Info(ret) | |
bdata, err := json.Marshal(ret) | |
if err == nil { | |
fmt.Fprint(w, string(bdata)) | |
} | |
} | |
func joinRoom(c web.C, w http.ResponseWriter, r *http.Request) { | |
roomid := c.URLParams["roomid"] | |
userid := c.URLParams["uid"] | |
glog.Infof("join %s to %s", userid, roomid) | |
client.SAdd(roomid, userid) | |
responseSuccess(w) | |
} | |
func leaveRoom(c web.C, w http.ResponseWriter, r *http.Request) { | |
roomid := c.URLParams["roomid"] | |
userid := c.URLParams["uid"] | |
glog.Infof("leave %s from %s", userid, roomid) | |
client.SRem(roomid, userid) | |
client.Del(roomid + ":" + userid) | |
responseSuccess(w) | |
} | |
func receive(pubsub *redis.PubSub) { | |
for { | |
msg, err := pubsub.Receive() | |
if err == nil { | |
switch t := msg.(type) { | |
default: | |
glog.Infof("unexpected type %T", t) | |
case *redis.Message: | |
glog.Infof("mssage %T", t) | |
roomid := msg.(*redis.Message).Payload | |
glog.Infof("receive message for %s", roomid) | |
room := global_state.GetAndResetChan(roomid) | |
for _, ch := range room { | |
ch <- true | |
} | |
case *redis.Subscription: | |
glog.Infof("subscription %T", t) | |
count := msg.(*redis.Subscription).Count | |
if count == 0 { | |
break | |
} | |
} | |
} | |
} | |
} | |
func subscribe(signal_chan chan os.Signal) { | |
// waiting pubsub | |
pubsub := client.PubSub() | |
defer pubsub.Close() | |
err := pubsub.Subscribe(pub_channel) | |
if err != nil { | |
panic(err) | |
} | |
go receive(pubsub) | |
<-signal_chan | |
//shutdown | |
global_state.RereaseAll() | |
pubsub.PUnsubscribe("*") | |
} | |
func main() { | |
flag.Parse() | |
cpus := runtime.NumCPU() | |
runtime.GOMAXPROCS(cpus) | |
glog.Infof("cpu num: %d", cpus) | |
init_client() | |
signal_chan := make(chan os.Signal, 1) | |
signal.Notify(signal_chan, | |
syscall.SIGHUP, | |
syscall.SIGINT, | |
syscall.SIGTERM, | |
syscall.SIGQUIT) | |
go subscribe(signal_chan) | |
goji.Get("/room/:roomid/join/:uid", joinRoom) | |
goji.Get("/room/:roomid/leave/:uid", leaveRoom) | |
goji.Post("/room/:roomid/post/:uid", postMessage) | |
goji.Get("/room/:roomid/get/:uid", getMessage) | |
goji.Serve() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment