Created
January 21, 2016 03:57
-
-
Save gigablah/f210c05ce03743b16318 to your computer and use it in GitHub Desktop.
Multiple channel debouncing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
"sync" | |
) | |
type APIEvents struct { | |
Status string | |
ID int | |
} | |
type Config struct { | |
Name string | |
Min time.Duration | |
Max time.Duration | |
} | |
type Client struct { | |
Listener chan *APIEvents | |
} | |
func NewClient(done chan struct{}) (*Client, error) { | |
client := &Client{ | |
Listener: nil, | |
} | |
go func(client *Client) { | |
time.Sleep(100 * time.Millisecond) | |
fmt.Printf("[%s] Starting to send events...\n", time.Since(start)) | |
for i := 0; i < 30; i++ { | |
if client.Listener != nil { | |
client.Listener <- &APIEvents{"start", i} | |
} | |
time.Sleep(30 * time.Millisecond) | |
} | |
time.Sleep(500 * time.Millisecond) | |
for i := 30; i < 130; i++ { | |
if client.Listener != nil { | |
client.Listener <- &APIEvents{"start", i} | |
} | |
time.Sleep(10 * time.Millisecond) | |
} | |
time.Sleep(500 * time.Millisecond) | |
if client.Listener != nil { | |
client.Listener <- &APIEvents{"stop", 130} | |
} | |
time.Sleep(500 * time.Millisecond) | |
done <- struct{}{} | |
}(client) | |
return client, nil | |
} | |
func (client *Client) AddEventListener(c chan *APIEvents) error { | |
client.Listener = c | |
return nil | |
} | |
func (client *Client) RemoveEventListener(c chan *APIEvents) error { | |
client.Listener = nil | |
return nil | |
} | |
func (client *Client) Ping() error { | |
return nil | |
} | |
var start time.Time | |
func init() { | |
start = time.Now() | |
} | |
func doSomething(config string) { | |
fmt.Printf("[%s] Doing expensive operation for %s...\n", time.Since(start), config) | |
time.Sleep(100 * time.Millisecond) | |
} | |
func main() { | |
var wg sync.WaitGroup | |
done := make(chan struct{}) | |
defer close(done) | |
var client *Client | |
var watchers []chan *APIEvents | |
configs := [2]Config { | |
Config{ | |
Name: "config1", | |
Min: 500 * time.Millisecond, | |
Max: 1 * time.Second, | |
}, | |
Config{ | |
Name: "config2", | |
Min: 1 * time.Second, | |
Max: 2 * time.Second, | |
}, | |
} | |
for _, config := range configs { | |
wg.Add(1) | |
watcher := make(chan *APIEvents, 100) | |
watchers = append(watchers, watcher) | |
go func(config Config, watcher chan *APIEvents) { | |
defer wg.Done() | |
debouncedChan := debounce(watcher, config.Min, config.Max) | |
for _ = range debouncedChan { | |
doSomething(config.Name) | |
} | |
}(config, watcher) | |
} | |
// Maintains client connection and passes events to watchers | |
go func() { | |
eventChan := make(chan *APIEvents, 100) | |
defer close(eventChan) | |
for { | |
if client == nil { | |
var err error | |
client, err = NewClient(done) | |
if err != nil { | |
fmt.Printf("[%s] Error connecting to client!\n", time.Since(start)) | |
time.Sleep(1 * time.Second) | |
continue | |
} | |
} | |
watching := false | |
for { | |
if client == nil { | |
break | |
} | |
if !watching { | |
client.AddEventListener(eventChan) | |
watching = true | |
fmt.Printf("[%s] Watching client events...\n", time.Since(start)) | |
} | |
select { | |
case event := <-eventChan: | |
if event == nil { | |
if watching { | |
client.RemoveEventListener(eventChan) | |
watching = false | |
client = nil | |
} | |
break | |
} | |
if event.Status == "start" || event.Status == "stop" || event.Status == "die" { | |
fmt.Printf("[%s] Received event %s with ID %d\n", time.Since(start), event.Status, event.ID) | |
// fanout event to all watchers | |
for _, watcher := range watchers { | |
watcher <- event | |
} | |
} | |
case <-time.After(10 * time.Second): | |
// check for client liveness | |
err := client.Ping() | |
if err != nil { | |
fmt.Printf("[%s] Unable to ping client: %s", time.Since(start)) | |
if watching { | |
client.RemoveEventListener(eventChan) | |
watching = false | |
client = nil | |
} | |
} | |
case <-done: | |
for _, watcher := range watchers { | |
close(watcher) | |
} | |
return | |
} | |
} | |
} | |
}() | |
wg.Wait() | |
fmt.Printf("[%s] Exited loop...\n", time.Since(start)) | |
time.Sleep(100 * time.Millisecond) | |
fmt.Printf("[%s] Done.\n", time.Since(start)) | |
} | |
func debounce(input chan *APIEvents, min time.Duration, max time.Duration) chan *APIEvents { | |
if min == 0 { | |
return input | |
} | |
output := make(chan *APIEvents, 100) | |
go func() { | |
var ( | |
buffer *APIEvents | |
minTimer <-chan time.Time | |
maxTimer <-chan time.Time | |
) | |
defer close(output) | |
// Start debouncing | |
for { | |
select { | |
case event, ok := <-input: | |
if !ok { | |
return | |
} | |
buffer = event | |
minTimer = time.After(min) | |
if maxTimer == nil { | |
maxTimer = time.After(max) | |
} | |
case <-minTimer: | |
fmt.Printf("[%s] Debounce MinTimer fired!\n", time.Since(start)) | |
minTimer, maxTimer = nil, nil | |
output <- buffer | |
case <-maxTimer: | |
fmt.Printf("[%s] Debounce MaxTimer fired!\n", time.Since(start)) | |
minTimer, maxTimer = nil, nil | |
output <- buffer | |
} | |
} | |
}() | |
return output | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment