Created
February 3, 2017 00:11
-
-
Save anthonynsimon/aa8d58f36a97d4fcac6ea16c8ab73f85 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"fmt" | |
"log" | |
"net" | |
"io" | |
) | |
const ( | |
T_START = iota | |
T_P | |
T_PU | |
T_PUB | |
T_PUB_DELIM | |
T_PUB_TOPIC | |
T_S | |
T_SU | |
T_SUB | |
T_SUB_DELIM | |
T_SUB_TOPIC | |
) | |
var ( | |
topics = make(map[string]chan []byte) | |
uniqueTopic = make(chan []byte, 1024) | |
) | |
func main() { | |
port := 6543 | |
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) | |
if err != nil { | |
log.Fatal(err) | |
} | |
for { | |
conn, err := lis.Accept() | |
if err != nil { | |
log.Println(err) | |
} | |
go handleConnection(conn) | |
} | |
} | |
// wire protocol | |
// sub [topic] | |
// unsub [topic] | |
// pub [topic] [data] | |
func handleConnection(conn net.Conn) { | |
log.Println("established connection") | |
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) | |
defer conn.Close() | |
for { | |
line, err := rw.ReadString('\n') | |
if err == io.EOF { | |
return | |
} | |
if err != nil { | |
return | |
} | |
state := T_START | |
topicStart := -1 | |
topicEnd := -1 | |
PARSER: | |
for i, c := range line { | |
switch state { | |
case T_START: | |
switch c { | |
case 'p', 'P': | |
state = T_P | |
case 's', 'S': | |
state = T_S | |
default: | |
return | |
} | |
case T_P: | |
switch c { | |
case 'u', 'U': | |
state = T_PU | |
default: | |
return | |
} | |
case T_S: | |
switch c { | |
case 'u', 'U': | |
state = T_SU | |
default: | |
return | |
} | |
case T_PU: | |
switch c { | |
case 'b', 'B': | |
state = T_PUB | |
default: | |
return | |
} | |
case T_SU: | |
switch c { | |
case 'b', 'B': | |
state = T_SUB | |
default: | |
return | |
} | |
case T_PUB: | |
switch c { | |
case ' ': | |
topicStart = i+1 | |
state = T_PUB_DELIM | |
default: | |
return | |
} | |
case T_SUB: | |
switch c { | |
case ' ': | |
topicStart = i+1 | |
state = T_SUB_DELIM | |
default: | |
return | |
} | |
case T_PUB_DELIM: | |
switch c { | |
case ' ': | |
topicEnd = i | |
state = T_PUB_TOPIC | |
break PARSER | |
} | |
case T_SUB_DELIM: | |
switch c { | |
case ' ', '\n': | |
topicEnd = i | |
state = T_SUB_TOPIC | |
break PARSER | |
} | |
} | |
} | |
switch state { | |
case T_PUB_TOPIC: | |
publish(line[topicStart:topicEnd], []byte(line[topicEnd+1:]),rw) | |
case T_SUB_TOPIC: | |
subscribe(line[topicStart:topicEnd], rw) | |
default: | |
return | |
} | |
} | |
} | |
func subscribe(topic string, w *bufio.ReadWriter) { | |
// topicChan := ensureChannel(topic) | |
w.Write([]byte("Subscribed to " + topic + "\n")) | |
w.Flush() | |
go func() { | |
for { | |
data := <- uniqueTopic | |
w.Write(data) | |
w.Flush() | |
} | |
}() | |
} | |
func publish(topic string, data []byte, w *bufio.ReadWriter) { | |
// topicChan := ensureChannel(topic) | |
uniqueTopic <- data | |
w.Write([]byte("Published to " + topic + "\n")) | |
w.Flush() | |
} | |
func ensureChannel(topic string) chan []byte { | |
topicChan, ok := topics[topic] | |
if !ok { | |
topicChan = make(chan []byte, 1024) | |
topics[topic] = topicChan | |
} | |
fmt.Printf("got channel for topic: %s - %v\n", topic, topicChan) | |
return topicChan | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment