Skip to content

Instantly share code, notes, and snippets.

@gmcquillan
Created October 18, 2013 02:46
Show Gist options
  • Save gmcquillan/7035728 to your computer and use it in GitHub Desktop.
Save gmcquillan/7035728 to your computer and use it in GitHub Desktop.
Whispering Gophers Example Code With UDP Discovery
// Summarily stolen from the Whispering Gophers Example!
//
// Solution to part 8 of the Whispering Gophers code lab.
//
// This program extends part 8.
//
// It connects to the peer specified by -peer.
// It accepts connections from peers and receives messages from them.
// When it sees a peer with an address it hasn't seen before, it makes a
// connection to that peer.
// It adds an ID field containing a random string to each outgoing message.
// When it recevies a message with an ID it hasn't seen before, it broadcasts
// that message to all connected peers.
//
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"os"
"sync"
"code.google.com/p/whispering-gophers/util"
)
var (
peerAddr = flag.String("peer", "", "peer host:port")
self string
discPort int = 5555
)
type Message struct {
ID string
Addr string
Body string
}
func main() {
flag.Parse()
l, err := util.Listen()
if err != nil {
log.Fatal(err)
}
go discoveryListen()
go discoveryClient("192.168.1.121", "3245")
self = l.Addr().String()
log.Println("Listening on", self)
go dial(*peerAddr)
go readInput()
for {
c, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go serve(c)
}
}
var peers = &Peers{m: make(map[string]chan<- Message)}
type Peers struct {
m map[string]chan<- Message
mu sync.RWMutex
}
// Add creates and returns a new channel for the given peer address.
// If an address already exists in the registry, it returns nil.
func (p *Peers) Add(addr string) <-chan Message {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.m[addr]; ok {
return nil
}
ch := make(chan Message)
p.m[addr] = ch
return ch
}
// Remove deletes the specified peer from the registry.
func (p *Peers) Remove(addr string) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.m, addr)
}
// List returns a slice of all active peer channels.
func (p *Peers) List() []chan<- Message {
p.mu.RLock()
defer p.mu.RUnlock()
l := make([]chan<- Message, 0, len(p.m))
for _, ch := range p.m {
l = append(l, ch)
}
return l
}
func broadcast(m Message) {
for _, ch := range peers.List() {
select {
case ch <- m:
default:
// Okay to drop messages sometimes.
}
}
}
func serve(c net.Conn) {
defer c.Close()
d := json.NewDecoder(c)
for {
var m Message
err := d.Decode(&m)
if err != nil {
log.Println(err)
return
}
if Seen(m.ID) {
continue
}
fmt.Printf("%#v\n", m)
broadcast(m)
go dial(m.Addr)
}
}
func readInput() {
s := bufio.NewScanner(os.Stdin)
for s.Scan() {
m := Message{
ID: util.RandomID(),
Addr: self,
Body: s.Text(),
}
Seen(m.ID)
broadcast(m)
}
if err := s.Err(); err != nil {
log.Fatal(err)
}
}
func dial(addr string) {
if addr == self {
return // Don't try to dial self.
}
ch := peers.Add(addr)
if ch == nil {
return // Peer already connected.
}
defer peers.Remove(addr)
c, err := net.Dial("tcp", addr)
if err != nil {
log.Println(addr, err)
return
}
defer c.Close()
e := json.NewEncoder(c)
for m := range ch {
err := e.Encode(m)
if err != nil {
log.Println(addr, err)
return
}
}
}
var seenIDs = struct {
m map[string]bool
sync.Mutex
}{m: make(map[string]bool)}
// Seen returns true if the specified id has been seen before.
// If not, it returns false and marks the given id as "seen".
func Seen(id string) bool {
seenIDs.Lock()
ok := seenIDs.m[id]
seenIDs.m[id] = true
seenIDs.Unlock()
return ok
}
func discoveryClient(addr string, port string) {
BROADCAST_IPv4 := net.IPv4(255, 255, 255, 255)
socket, err := net.DialUDP("udp4", nil, &net.UDPAddr{
IP: BROADCAST_IPv4,
Port: discPort,
})
if err != nil {
log.Fatal("Couldn't send UDP?!?!")
}
socket.Write([]byte(fmt.Sprintf("%s:%s", addr, port)))
log.Printf("Sent a discovery packet!")
}
func discoveryListen() {
socket, err := net.ListenUDP("udp4", &net.UDPAddr{
IP: net.IPv4(0, 0, 0, 0),
Port: discPort,
})
if err != nil {
log.Fatal("Couldn't open UDP?!?")
}
for {
data := make([]byte, 4096)
_, remoteAddr, err := socket.ReadFromUDP(data)
if err != nil {
log.Fatal("Problem reading UDP packet")
}
log.Printf("Adding this address to Peer List: %v", remoteAddr)
peers.Add(string(data))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment