Created
August 4, 2023 00:03
-
-
Save fionera/418229c28891eba7dd0f8b019d180a38 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fdcache | |
import ( | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"sync" | |
"sync/atomic" | |
"time" | |
) | |
type File interface { | |
io.Reader | |
io.Writer | |
io.Seeker | |
io.Closer | |
Name() string | |
} | |
func Open(name string) (File, error) { | |
return OpenFile(name, os.O_RDONLY, 0) | |
} | |
func OpenFile(name string, flag int, perm os.FileMode) (File, error) { | |
return DefaultCache.OpenFile(name, flag, perm) | |
} | |
type Cache interface { | |
OpenFile(name string, flag int, perm os.FileMode) (File, error) | |
closeFile(*cachedFile) error | |
} | |
type key struct { | |
name string | |
flag int | |
} | |
type filePool struct { | |
key | |
pool sync.Pool | |
cleanupMtx sync.RWMutex | |
ticker *time.Ticker | |
timeout time.Duration | |
openFDs int64 | |
} | |
func (f *filePool) new() any { | |
log.Printf("new(): %s", f.name) | |
fd, err := os.OpenFile(f.name, f.flag, 0) | |
if err != nil { | |
return err | |
} | |
return fd | |
} | |
func (f *filePool) Get() any { | |
f.ticker.Reset(f.timeout) | |
f.cleanupMtx.RLock() | |
defer f.cleanupMtx.RUnlock() | |
atomic.AddInt64(&f.openFDs, 1) | |
return f.pool.Get() | |
} | |
func (f *filePool) Put(x any) { | |
f.cleanupMtx.RLock() | |
defer f.cleanupMtx.RUnlock() | |
fd, ok := x.(*os.File) | |
if !ok { | |
panic("cant put a non file into a filePool") | |
} | |
if fd.Name() != f.name { | |
panic("cant put files with different names into same pool") | |
} | |
_, err := fd.Seek(0, io.SeekStart) | |
if err != nil { | |
panic("cant seek to start of file: " + err.Error()) | |
} | |
atomic.AddInt64(&f.openFDs, -1) | |
f.pool.Put(x) | |
} | |
func (f *filePool) cleanup(parentCleanup func(k key)) { | |
defer f.ticker.Stop() | |
for { | |
<-f.ticker.C | |
f.cleanupMtx.Lock() | |
if atomic.LoadInt64(&f.openFDs) != 0 { | |
log.Println("checking for cleanup, but found open fds") | |
f.ticker.Reset(f.timeout) | |
f.cleanupMtx.Unlock() | |
continue | |
} | |
break | |
} | |
log.Println("starting cleanup for: ", f.key) | |
defer log.Println("done cleanup for: ", f.key) | |
f.pool.New = nil | |
for { | |
get := f.pool.Get() | |
if get == nil { | |
break | |
} | |
err := get.(*os.File).Close() | |
if err != nil { | |
log.Println("failed closing: ", err) | |
} | |
} | |
f.pool.New = f.new | |
parentCleanup(f.key) | |
f.cleanupMtx.Unlock() | |
} | |
func newFilePool(k key, timeout time.Duration, parentCleanup func(k key)) *filePool { | |
f := &filePool{key: k, timeout: timeout, ticker: time.NewTicker(timeout)} | |
f.pool.New = f.new | |
go f.cleanup(parentCleanup) | |
return f | |
} | |
type timerCache struct { | |
mtx sync.Mutex | |
fds map[key]*filePool | |
timeout time.Duration | |
} | |
func (t *timerCache) closeFile(file *cachedFile) error { | |
t.mtx.Lock() | |
pool, ok := t.fds[file.k] | |
if !ok { | |
panic("cant file filepool for existing fd") | |
} | |
t.mtx.Unlock() | |
pool.Put(file.f) | |
return nil | |
} | |
func (t *timerCache) cleanup(k key) { | |
t.mtx.Lock() | |
delete(t.fds, k) | |
t.mtx.Unlock() | |
} | |
func (t *timerCache) OpenFile(name string, flag int, perm os.FileMode) (File, error) { | |
if flag&os.O_CREATE != 0 { | |
return nil, fmt.Errorf("creating files is not supported") | |
} | |
t.mtx.Lock() | |
k := key{name, flag} | |
pool, ok := t.fds[k] | |
if !ok { | |
fp := newFilePool(k, t.timeout, t.cleanup) | |
t.fds[k] = fp | |
pool = fp | |
} | |
t.mtx.Unlock() | |
get := pool.Get() | |
if err, ok := get.(error); ok { | |
return nil, err | |
} | |
return &cachedFile{f: get.(*os.File), cache: t, k: k}, nil | |
} | |
type cachedFile struct { | |
cache Cache | |
k key | |
f *os.File | |
closed bool | |
mtx sync.RWMutex | |
} | |
func (f *cachedFile) Read(p []byte) (n int, err error) { | |
f.mtx.Lock() | |
if !f.closed { | |
return 0, os.ErrClosed | |
} | |
f.mtx.Unlock() | |
return f.f.Read(p) | |
} | |
func (f *cachedFile) Write(p []byte) (n int, err error) { | |
f.mtx.Lock() | |
if !f.closed { | |
return 0, os.ErrClosed | |
} | |
f.mtx.Unlock() | |
return f.f.Write(p) | |
} | |
func (f *cachedFile) Seek(offset int64, whence int) (int64, error) { | |
f.mtx.Lock() | |
if !f.closed { | |
return 0, os.ErrClosed | |
} | |
f.mtx.Unlock() | |
return f.f.Seek(offset, whence) | |
} | |
func (f *cachedFile) Name() string { | |
return f.k.name | |
} | |
func (f *cachedFile) Close() error { | |
f.mtx.Lock() | |
if f.closed { | |
return os.ErrClosed | |
} | |
f.mtx.Unlock() | |
return f.cache.closeFile(f) | |
} | |
// NewTimerCache creates a Cache that invalidates fds which where not | |
// used for the given timeout duration. | |
func NewTimerCache(timeout time.Duration) Cache { | |
return &timerCache{ | |
timeout: timeout, | |
fds: make(map[key]*filePool), | |
} | |
} | |
var DefaultCache = NewTimerCache(5 * time.Second) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment