Skip to content

Instantly share code, notes, and snippets.

@swdunlop
Created December 27, 2010 23:56
Show Gist options
  • Save swdunlop/756711 to your computer and use it in GitHub Desktop.
Save swdunlop/756711 to your computer and use it in GitHub Desktop.
A message wall written in Go that uses Redis Publish and Subscribe to create a persistent real-time message service.
// Copyright (C) 2010, Scott W. Dunlop <swdunlop at gmail.com>
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package main
import "redis"
import "fmt"
import "os"
import "strconv"
var db redis.Client
func init() {
db.Addr = "127.0.0.1:6379"
}
func main() {
var err os.Error = nil
n := len(os.Args)
if n < 2 {
printUsage(1)
}
v := os.Args[1]
n -= 2
switch {
case v == "wait" && n == 0:
err = doWaitMessages()
case v == "flush" && n == 0:
err = doFlushAll()
case v == "read" && n == 0:
err = doReadAll()
case v == "read" && n == 1:
err = doReadFrom(os.Args[2])
case v == "post" && n == 1:
err = doPostMessage(os.Args[2])
default:
printUsage(2)
}
if err != nil {
fmt.Println("error:", err)
os.Exit(3)
}
}
func printUsage(code int) {
fmt.Println("USAGE: wall post <message>")
fmt.Println(" wall read <offset>")
fmt.Println(" wall read")
fmt.Println(" wall flush")
fmt.Println(" wall wait")
os.Exit(code)
}
func doWaitMessages() os.Error {
msg, err := waitMessage()
if err != nil {
return err
}
fmt.Println("--", msg)
return nil
}
func doFlushAll() os.Error {
return flushMessages(0)
}
func doReadAll() os.Error {
return readFrom(0)
}
func doReadFrom(arg0 string) os.Error {
start, err := strconv.Atoi64(string(arg0))
if err != nil {
return err
}
if start < 0 {
return os.NewError("minimum offset for read is 0")
}
return readFrom(start)
}
func doPostMessage(arg0 string) os.Error {
return postMessage(arg0)
}
func waitMessage() (string, os.Error) {
var mon Monitor
mon.start("msg:evt")
defer mon.stop()
return mon.next(), nil
}
func flushMessages(ct int64) os.Error {
return set_db_int("ct", ct)
}
func readFrom(start int64) os.Error {
var mon Monitor
mon.start("msg:evt")
defer mon.stop()
ct, err := get_db_int("ct")
if err != nil {
return err
}
for i := start; i < ct; i++ {
key := fmt.Sprint("msg:", i)
msg, err := get_db_str(key)
if err != nil {
return err
}
fmt.Println("--", msg)
mon.stop() // Got a result, we can stop monitoring.
}
for {
msg := mon.next()
if msg == "" { // Looks like we were closed.
break
}
fmt.Println("--", msg)
mon.stop() // Got a result, we can stop monitoring.
}
return nil
}
func postMessage(msg string) os.Error {
err := pub_db_str("msg:evt", msg)
if err != nil {
return err
}
ix, err := db.Incr("ct")
if err != nil {
return err
}
key := fmt.Sprint("msg:", ix-1)
return set_db_str(key, msg)
}
func get_db_int(key string) (int64, os.Error) {
buf, err := db.Get(key)
if err != nil {
return 0, err
}
return strconv.Atoi64(string(buf))
}
func get_db_str(key string) (string, os.Error) {
buf, err := db.Get(key)
if err != nil {
return "", err
}
return string(buf), nil
}
func set_db_str(key string, val string) os.Error {
err := db.Set(key, []byte(val))
if err != nil {
return err
}
return nil
}
func set_db_int(key string, val int64) os.Error {
err := db.Set(key, []byte(fmt.Sprint(val)))
if err != nil {
return err
}
return nil
}
func pub_db_str(key string, val string) os.Error {
return db.Publish(key, []byte(val))
}
// Used to subscribe to an update channel, providing overwatch for high
// latency polling. (Such as long polling with XmlHttpRequest.)
type Monitor struct {
evt chan redis.Message
sub chan string
stopped bool
}
func (m *Monitor) start(key string) {
m.evt = make(chan redis.Message, 0)
m.sub = make(chan string, 1)
m.sub <- key
m.stopped = false
go db.Subscribe(m.sub, nil, nil, nil, m.evt)
}
func (m *Monitor) stop() {
if !m.stopped {
m.stopped = true
close(m.evt)
close(m.sub)
}
}
func (m *Monitor) next() string {
return string((<-m.evt).Message)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment