Skip to content

Instantly share code, notes, and snippets.

@rjeczalik
Last active January 13, 2019 00:46
Show Gist options
  • Save rjeczalik/7faa93691e6da35b1f84 to your computer and use it in GitHub Desktop.
Save rjeczalik/7faa93691e6da35b1f84 to your computer and use it in GitHub Desktop.
queued watch
package main
import (
"log"
"os"
"sync"
"github.com/rjeczalik/notify"
)
func main() {
c, err := QueuedWatch(".", notify.All)
if err != nil {
log.Fatalln(err)
}
for ev := range c {
log.Println(os.Args[0], ev)
}
}
func QueuedWatch(path string, events ...notify.Event) (chan notify.EventInfo, error) {
in := make(chan notify.EventInfo, 10240)
err := notify.Watch(path, in, events...)
if err != nil {
return nil, err
}
out := make(chan notify.EventInfo)
queueEvents(in, out)
return out, nil
}
type eventQueue struct {
mu sync.Mutex
queue []notify.EventInfo
sleep chan struct{}
}
func queueEvents(in, out chan notify.EventInfo) {
ev := &eventQueue{
sleep: make(chan struct{}, 1),
}
go ev.enqueueLoop(in)
go ev.popLoop(out)
}
func (eq *eventQueue) enqueueLoop(in chan notify.EventInfo) {
var sleeping bool
for ev := range in {
eq.mu.Lock()
sleeping = len(eq.queue) == 0
eq.queue = append(eq.queue, ev)
if sleeping {
select {
case eq.sleep <- struct{}{}:
default:
}
}
eq.mu.Unlock()
}
}
func (eq *eventQueue) pop() (ev notify.EventInfo, n int) {
eq.mu.Lock()
defer eq.mu.Unlock()
if n = len(eq.queue); n == 0 {
return nil, 0
}
ev, eq.queue = eq.queue[0], eq.queue[1:]
return ev, n
}
func (eq *eventQueue) popLoop(out chan notify.EventInfo) {
var ev notify.EventInfo
var n int
for range eq.sleep {
for ev, n = eq.pop(); n != 0; ev, n = eq.pop() {
out <- ev
}
}
}
@sent-hil
Copy link

@rjeczalik still running into missing events; ran the above code and did $ for f in {0..150}; do echo $f > "$f.txt"; done in a different terminal. http://take.ms/U4mtF

@rjeczalik
Copy link
Author

Hey @sent-hil, I've verified it on a Koding VM, my QueuedWatch just needs bigger inner-buffer (I've updated the above gist with 10240).

You can verify if there're dropped events - build the binary with debug tag, e.g.:

~ $ go run -tags debug qwatch.go 2>&1 | tee qwatch.log

Then the library will log to stderr any dropped events:

rjeczalik: ~/tmp $ for i in {1..2500}; do echo $i > $i.txt; done   
rjeczalik: ~/tmp $ grep -c 'qwatch notify.Create' ../qwatch.log 
2500
rjeczalik: ~/tmp $ rm *.txt
rjeczalik: ~/tmp $ grep -c 'qwatch notify.Remove' ../qwatch.log       
2500
rjeczalik: ~/tmp $ for i in {1..10000}; do echo $i > $i.txt; done                                                                                                                          
rjeczalik: ~/tmp $ grep -c 'dropped' ../qwatch.log                     
3027
rjeczalik: ~/tmp $ grep -m10 'dropped' ../qwatch.log   
[D] dropped notify.Write on "/home/rjeczalik/tmp/788.txt": receiver too slow                                                                                                            
[D] dropped notify.Create on "/home/rjeczalik/tmp/789.txt": receiver too slow                                                                                                           
[D] dropped notify.Write on "/home/rjeczalik/tmp/789.txt": receiver too slow                                                                                                            
[D] dropped notify.Create on "/home/rjeczalik/tmp/790.txt": receiver too slow                                                                                                           
[D] dropped notify.Write on "/home/rjeczalik/tmp/790.txt": receiver too slow                                                                                                            
[D] dropped notify.Create on "/home/rjeczalik/tmp/791.txt": receiver too slow                                                                                                           
[D] dropped notify.Write on "/home/rjeczalik/tmp/791.txt": receiver too slow                                                                                                            
[D] dropped notify.Create on "/home/rjeczalik/tmp/792.txt": receiver too slow                                                                                                           
[D] dropped notify.Write on "/home/rjeczalik/tmp/792.txt": receiver too slow                                                                                                            
[D] dropped notify.Create on "/home/rjeczalik/tmp/793.txt": receiver too slow  

I'll see if we can add a buffering to the notify API to do it for the user, for now you'll need to keep bumping the buffer yourself :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment