Last active
December 30, 2015 08:17
-
-
Save supershabam/6a2222556ffd2ae1a381 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import "fmt" | |
import "sync" | |
import "time" | |
import "log" | |
type IDMesg struct { | |
ID string | |
Mesg string | |
} | |
type Prog struct { | |
Done bool | |
Timer *time.Timer | |
In chan IDMesg | |
Out chan IDMesg | |
Ch map[string]chan string | |
M sync.RWMutex | |
} | |
func MakeProg() *Prog { | |
return &Prog{ | |
Timer: time.NewTimer(time.Millisecond), | |
In: make(chan IDMesg), | |
Out: make(chan IDMesg), | |
Ch: make(map[string]chan string), | |
} | |
} | |
func (p Prog) Connect(id string, in <-chan string) (<-chan IDMesg, error) { | |
p.M.Lock() | |
defer p.M.Unlock() | |
p.Timer.Reset(time.Hour * 8760) | |
// TODO error if program has terminated and we're attempting to connect a new channel | |
if p.Done { | |
return nil, fmt.Errorf("program has terminated") | |
} | |
if _, ok := p.Ch[id]; ok { | |
return nil, fmt.Errorf("id already connected") | |
} | |
ch := make(chan string, 0) | |
p.Ch[id] = ch | |
out := make(chan IDMesg) | |
done := make(chan struct{}) | |
var once sync.Once | |
go func() { | |
wg := sync.WaitGroup{} | |
wg.Add(2) | |
// drain in | |
go func() { | |
defer wg.Done() | |
for { | |
select { | |
case <-done: | |
return | |
case m, ok := <-in: | |
if !ok { | |
// caller disconnected | |
once.Do(func() { | |
close(done) | |
}) | |
return | |
} | |
select { | |
case <-done: | |
return | |
case p.In <- IDMesg{ | |
ID: id, | |
Mesg: m, | |
}: | |
} | |
} | |
} | |
}() | |
// drain ch | |
go func() { | |
defer wg.Done() | |
for { | |
select { | |
case <-done: | |
return | |
case m, ok := <-ch: | |
if !ok { | |
// caller disconnected | |
once.Do(func() { | |
close(done) | |
}) | |
return | |
} | |
select { | |
case <-done: | |
return | |
case out <- IDMesg{ | |
ID: id, | |
Mesg: m, | |
}: | |
} | |
} | |
} | |
}() | |
wg.Wait() | |
p.M.Lock() | |
close(p.Ch[id]) | |
delete(p.Ch, id) | |
// setup cleanup timer if you're the last one out | |
if len(p.Ch) == 0 { | |
p.Timer.Reset(time.Second) | |
} | |
p.M.Unlock() | |
close(out) | |
}() | |
return out, nil | |
} | |
func (p Prog) Run() error { | |
go func() { | |
p.Out <- IDMesg{ID: "1", Mesg: "HI"} | |
p.Out <- IDMesg{ID: "2", Mesg: "HI"} | |
p.Out <- IDMesg{ID: "1", Mesg: "HI2"} | |
close(p.Out) | |
}() | |
go func() { | |
for m := range p.Out { | |
p.M.RLock() | |
if ch, ok := p.Ch[m.ID]; ok { | |
select { | |
case ch <- m.Mesg: | |
default: // ch buffer is full - let's handle it | |
select { | |
case drop := <-ch: // pull one item off front of ch | |
log.Printf("dropping message (%s) because ch is busy", drop) | |
ch <- m.Mesg // we are the only goroutine writing into ch, so we won't block here | |
default: // ch buffer self-drained and didn't have an item to pop off the front | |
} | |
} | |
} | |
p.M.RUnlock() | |
} | |
}() | |
for { | |
select { | |
case m, ok := <-p.In: | |
if !ok { | |
return nil | |
} | |
fmt.Printf("writing to program: %+v\n", m) | |
case <-p.Timer.C: | |
p.M.Lock() | |
p.Done = true | |
close(p.In) | |
p.M.Unlock() | |
return nil | |
} | |
} | |
return nil | |
} | |
func main() { | |
p := MakeProg() | |
go func() { | |
time.Sleep(time.Millisecond * 20) | |
in := make(chan string) | |
go func() { | |
in <- "hi there" | |
in <- "don't do that" | |
close(in) | |
}() | |
out, err := p.Connect("1", in) | |
if err != nil { | |
log.Fatal(err) | |
} | |
for m := range out { | |
fmt.Printf("receiving from program: %+v\n", m) | |
} | |
}() | |
go func() { | |
time.Sleep(time.Millisecond * 500) | |
in := make(chan string) | |
go func() { | |
in <- "hi there" | |
in <- "don't do that" | |
close(in) | |
}() | |
out, err := p.Connect("2", in) | |
if err != nil { | |
log.Fatal(err) | |
} | |
for m := range out { | |
fmt.Printf("receiving from program: %+v\n", m) | |
} | |
}() | |
p.Run() | |
fmt.Println("Hello, playground") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment