Skip to content

Instantly share code, notes, and snippets.

@gigablah
Created January 21, 2016 03:57
Show Gist options
  • Save gigablah/f210c05ce03743b16318 to your computer and use it in GitHub Desktop.
Save gigablah/f210c05ce03743b16318 to your computer and use it in GitHub Desktop.
Multiple channel debouncing
package main
import (
"fmt"
"time"
"sync"
)
type APIEvents struct {
Status string
ID int
}
type Config struct {
Name string
Min time.Duration
Max time.Duration
}
type Client struct {
Listener chan *APIEvents
}
func NewClient(done chan struct{}) (*Client, error) {
client := &Client{
Listener: nil,
}
go func(client *Client) {
time.Sleep(100 * time.Millisecond)
fmt.Printf("[%s] Starting to send events...\n", time.Since(start))
for i := 0; i < 30; i++ {
if client.Listener != nil {
client.Listener <- &APIEvents{"start", i}
}
time.Sleep(30 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
for i := 30; i < 130; i++ {
if client.Listener != nil {
client.Listener <- &APIEvents{"start", i}
}
time.Sleep(10 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
if client.Listener != nil {
client.Listener <- &APIEvents{"stop", 130}
}
time.Sleep(500 * time.Millisecond)
done <- struct{}{}
}(client)
return client, nil
}
func (client *Client) AddEventListener(c chan *APIEvents) error {
client.Listener = c
return nil
}
func (client *Client) RemoveEventListener(c chan *APIEvents) error {
client.Listener = nil
return nil
}
func (client *Client) Ping() error {
return nil
}
var start time.Time
func init() {
start = time.Now()
}
func doSomething(config string) {
fmt.Printf("[%s] Doing expensive operation for %s...\n", time.Since(start), config)
time.Sleep(100 * time.Millisecond)
}
func main() {
var wg sync.WaitGroup
done := make(chan struct{})
defer close(done)
var client *Client
var watchers []chan *APIEvents
configs := [2]Config {
Config{
Name: "config1",
Min: 500 * time.Millisecond,
Max: 1 * time.Second,
},
Config{
Name: "config2",
Min: 1 * time.Second,
Max: 2 * time.Second,
},
}
for _, config := range configs {
wg.Add(1)
watcher := make(chan *APIEvents, 100)
watchers = append(watchers, watcher)
go func(config Config, watcher chan *APIEvents) {
defer wg.Done()
debouncedChan := debounce(watcher, config.Min, config.Max)
for _ = range debouncedChan {
doSomething(config.Name)
}
}(config, watcher)
}
// Maintains client connection and passes events to watchers
go func() {
eventChan := make(chan *APIEvents, 100)
defer close(eventChan)
for {
if client == nil {
var err error
client, err = NewClient(done)
if err != nil {
fmt.Printf("[%s] Error connecting to client!\n", time.Since(start))
time.Sleep(1 * time.Second)
continue
}
}
watching := false
for {
if client == nil {
break
}
if !watching {
client.AddEventListener(eventChan)
watching = true
fmt.Printf("[%s] Watching client events...\n", time.Since(start))
}
select {
case event := <-eventChan:
if event == nil {
if watching {
client.RemoveEventListener(eventChan)
watching = false
client = nil
}
break
}
if event.Status == "start" || event.Status == "stop" || event.Status == "die" {
fmt.Printf("[%s] Received event %s with ID %d\n", time.Since(start), event.Status, event.ID)
// fanout event to all watchers
for _, watcher := range watchers {
watcher <- event
}
}
case <-time.After(10 * time.Second):
// check for client liveness
err := client.Ping()
if err != nil {
fmt.Printf("[%s] Unable to ping client: %s", time.Since(start))
if watching {
client.RemoveEventListener(eventChan)
watching = false
client = nil
}
}
case <-done:
for _, watcher := range watchers {
close(watcher)
}
return
}
}
}
}()
wg.Wait()
fmt.Printf("[%s] Exited loop...\n", time.Since(start))
time.Sleep(100 * time.Millisecond)
fmt.Printf("[%s] Done.\n", time.Since(start))
}
func debounce(input chan *APIEvents, min time.Duration, max time.Duration) chan *APIEvents {
if min == 0 {
return input
}
output := make(chan *APIEvents, 100)
go func() {
var (
buffer *APIEvents
minTimer <-chan time.Time
maxTimer <-chan time.Time
)
defer close(output)
// Start debouncing
for {
select {
case event, ok := <-input:
if !ok {
return
}
buffer = event
minTimer = time.After(min)
if maxTimer == nil {
maxTimer = time.After(max)
}
case <-minTimer:
fmt.Printf("[%s] Debounce MinTimer fired!\n", time.Since(start))
minTimer, maxTimer = nil, nil
output <- buffer
case <-maxTimer:
fmt.Printf("[%s] Debounce MaxTimer fired!\n", time.Since(start))
minTimer, maxTimer = nil, nil
output <- buffer
}
}
}()
return output
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment