Last active
October 29, 2015 12:08
-
-
Save nabeken/7f85026ccdb828f7bca7 to your computer and use it in GitHub Desktop.
Tempfile-based ReadSeekCloser implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ioutils | |
import ( | |
"io" | |
"io/ioutil" | |
"log" | |
"os" | |
"sync" | |
"sync/atomic" | |
) | |
type ReadSeekCloser interface { | |
io.Reader | |
io.Seeker | |
io.Closer | |
} | |
type ReadSeeker struct { | |
// reader for source | |
sr io.Reader | |
// reader for tempfile-based buffer | |
rs *os.File | |
// writer for tempfile-based buffer | |
rw *os.File | |
mu sync.Mutex | |
// err from sr and rw | |
err error | |
// whether rw is closed | |
closed bool | |
// buffer between sr and rw to write data to rw | |
copybuf []byte | |
// closed when copying is done | |
copych chan struct{} | |
// signaled when copy() writes data to rw | |
copycond *sync.Cond | |
once sync.Once | |
// position for rw | |
wpos int64 | |
// position for rs | |
rpos int64 | |
} | |
func (rs *ReadSeeker) Read(p []byte) (int, error) { | |
rs.once.Do(func() { go rs.copy() }) | |
rs.mu.Lock() | |
defer rs.mu.Unlock() | |
rpos := atomic.LoadInt64(&rs.rpos) | |
wpos := atomic.LoadInt64(&rs.wpos) | |
if !rs.closed && wpos == rpos { | |
// we are waiting for data to be available | |
log.Printf("read: waiting for data to be available: rpos:%d wpos: %d", rs.rpos, rs.wpos) | |
rs.copycond.Wait() | |
log.Printf("read: done waiting for data to be available: rpos:%d wpos: %d", rs.rpos, rs.wpos) | |
} | |
n, err := rs.rs.Read(p) | |
atomic.AddInt64(&rs.rpos, int64(n)) | |
if rs.closed { | |
log.Printf("read: copying is over: err:%s rs.err:%s rpos:%d wpos: %d", err, rs.err, rs.rpos, rs.wpos) | |
// copying is over | |
if rs.err != nil { | |
err = rs.err | |
} | |
// we can return io.EOF after copying is over | |
return n, err | |
} | |
log.Printf("read: copying is still ongoing: err:%s rpos:%d wpos: %d", err, rs.rpos, rs.wpos) | |
// copying is still ongoing so even if we got EOF from rs | |
if err == io.EOF { | |
err = nil | |
} | |
return n, err | |
} | |
func (rs *ReadSeeker) Seek(offset int64, whence int) (int64, error) { | |
rs.once.Do(func() { go rs.copy() }) | |
n, err := rs.rs.Seek(offset, whence) | |
atomic.StoreInt64(&rs.rpos, n) | |
log.Printf("seek: seek pos:%d offset:%d whence:%d err:%s rpos:%d wpos: %d", n, offset, whence, err, rs.rpos, rs.wpos) | |
return n, err | |
} | |
func (rs *ReadSeeker) Close() error { | |
// Closing reader | |
rs.rs.Close() | |
// Closing rw but we should wait for copy() to be finishd | |
// since copying is happened in another goroutine | |
<-rs.copych | |
return os.Remove(rs.rw.Name()) | |
} | |
func (rs *ReadSeeker) copy() { | |
for { | |
nr, err := rs.sr.Read(rs.copybuf) | |
if nr > 0 { | |
nw, ew := rs.rw.Write(rs.copybuf[0:nr]) | |
if ew != nil { | |
rs.err = ew | |
break | |
} | |
if nr != nw { | |
rs.err = io.ErrShortWrite | |
break | |
} | |
atomic.AddInt64(&rs.wpos, int64(nw)) | |
log.Printf("copy: read: n:%d rpos:%d wpos:%d", nw, rs.rpos, rs.wpos) | |
// signal that data is available for read | |
rs.copycond.Signal() | |
} | |
if err == io.EOF { | |
break | |
} | |
if err != nil { | |
rs.err = err | |
break | |
} | |
} | |
// we're done copying so closing... | |
// reader must be blocked while we're closing | |
rs.mu.Lock() | |
defer rs.mu.Unlock() | |
if err := rs.rw.Close(); err != nil { | |
rs.err = err | |
} | |
rs.closed = true | |
close(rs.copych) | |
log.Printf("copy: done: rpos:%d wpos:%d", rs.rpos, rs.wpos) | |
// signal that data is available for last read | |
rs.copycond.Signal() | |
} | |
func NewReadSeeker(r_ io.Reader) (ReadSeekCloser, error) { | |
rw, err := ioutil.TempFile("", "hoge") | |
if err != nil { | |
return nil, err | |
} | |
rs, err := os.Open(rw.Name()) | |
if err != nil { | |
return nil, err | |
} | |
rsc := &ReadSeeker{ | |
sr: r_, | |
rs: rs, | |
rw: rw, | |
// https://golang.org/src/io/io.go?s=12227:12287#L378 | |
copybuf: make([]byte, 32*1024), | |
// this should be buffered since copy will send a message | |
// when no one wait on Close() | |
copych: make(chan struct{}, 1), | |
} | |
rsc.copycond = sync.NewCond(&rsc.mu) | |
return rsc, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment