Created
January 28, 2015 18:37
-
-
Save karalabe/6de57007034d972b9ab6 to your computer and use it in GitHub Desktop.
Buffered concurrent copy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bufioext | |
import ( | |
"io" | |
"sync/atomic" | |
) | |
// Copy copies from src to dst until either EOF is reached on src or an error | |
// occurs. It returns the number of bytes copied and the first error encountered | |
// while copying, if any. | |
// | |
// A successful Copy returns err == nil, not err == EOF. Because Copy is defined | |
// to read from src until EOF, it does not treat an EOF from Read as an error to | |
// be reported. | |
// | |
// Internally, one goroutine is reading the src, moving the data into an internal | |
// buffer, and another moving from the buffer to the writer. This permits both | |
// endpoints to run simultaneously, without one blocking the other. | |
func Copy(dst io.Writer, src io.Reader, buffer int) (written int64, err error) { | |
buf := make([]byte, buffer) | |
bs, ba, rp, wp := int32(buffer), int32(buffer), int32(0), int32(0) | |
rs := make(chan struct{}, 1) // signaler for the reader, if it's asleep | |
ws := make(chan struct{}, 1) // signaler for the writer, if it's asleep | |
rq := make(chan struct{}) // quit channel when the reader terminates | |
wq := make(chan struct{}) // quit channel when the writer terminates | |
// Start a reader goroutine that pushes data into the buffer | |
go func() { | |
defer close(rq) | |
chunk := make([]byte, 32*1024) | |
for { | |
nr, er := src.Read(chunk) | |
if nr > 0 { | |
rpc := atomic.LoadInt32(&rp) | |
// Repeat until the chunk is pushed into the buffer | |
left := chunk | |
for { | |
bac := atomic.LoadInt32(&ba) | |
// If the buffer is full, wait | |
if bac == 0 { | |
select { | |
case <-rs: // wake signal from writer, retry | |
continue | |
case <-wq: // writer dead, return | |
return | |
} | |
} | |
nw := 0 | |
switch { | |
case int(bac) >= nr && wp <= bs-int32(nr): // enough space, no wrapping | |
copy(buf[wp:], left[:nr]) | |
nw = nr | |
case int(bac) >= nr && wp > bs-int32(nr): // enough space, wrapping | |
copy(buf[wp:], left[:bs-wp]) | |
copy(buf, left[bs-wp:nr]) | |
nw = nr | |
case int(bac) < nr && wp <= rpc: // not enough space, no wrapping | |
copy(buf[wp:], left[:bac]) | |
nw = int(bac) | |
case int(bac) < nr && wp > rpc: // not enough space, wrapping | |
copy(buf[wp:], left[:bs-wp]) | |
copy(buf, left[bs-wp:bac]) | |
nw = int(bac) | |
} | |
// Advance the writer pointer | |
wpn := wp + int32(nw) | |
if wpn >= bs { | |
wpn -= bs | |
} | |
atomic.StoreInt32(&wp, wpn) | |
atomic.AddInt32(&ba, -int32(nw)) | |
// Signal the writer if it's asleep | |
select { | |
case ws <- struct{}{}: | |
default: | |
} | |
// If everything was buffered, get the next chunk | |
if nw == nr { | |
break | |
} | |
left, nr = left[nw:], nr-nw | |
} | |
} | |
if er == io.EOF { | |
break | |
} | |
if er != nil { | |
err = er | |
return | |
} | |
} | |
}() | |
// Start a writer goroutine that retrieves data from the buffer | |
go func() { | |
defer close(wq) | |
for { | |
wpc := atomic.LoadInt32(&wp) | |
bac := atomic.LoadInt32(&ba) | |
// If there's no data available, sleep | |
if bac == bs { | |
select { | |
case <-ws: // wake signal from reader | |
continue | |
case <-rq: // reader done, return | |
// Check for buffer write/reader quit and above check race | |
bac = atomic.LoadInt32(&ba) | |
if bac != bs { | |
continue | |
} | |
return | |
} | |
} | |
// Write a batch of data | |
nw, nc := 0, int32(0) | |
var we error | |
switch { | |
case rp < wpc: // data available, no wrapping | |
nc = wpc - rp | |
nw, we = dst.Write(buf[rp:wpc]) | |
case rp >= wpc: // data available, wrapping | |
nc = bs - rp | |
nw, we = dst.Write(buf[rp:]) | |
} | |
// Update the counters and check for errors | |
if nw > 0 { | |
written += int64(nw) | |
} | |
if we != nil { | |
err = we | |
return | |
} | |
if nw != int(nc) { | |
err = io.ErrShortWrite | |
return | |
} | |
// Advance the reader pointer | |
rpn := rp + int32(nw) | |
if rpn >= bs { | |
rpn -= bs | |
} | |
atomic.StoreInt32(&rp, rpn) | |
atomic.AddInt32(&ba, int32(nw)) | |
// Signal the reader if it's asleep | |
select { | |
case rs <- struct{}{}: | |
default: | |
} | |
} | |
}() | |
// Wait until both finish and return | |
<-wq | |
<-rq | |
return | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bufioext | |
import ( | |
"bytes" | |
"io/ioutil" | |
"math/rand" | |
"testing" | |
) | |
// Random generates a pseudo-random binary blob. | |
func random(length int) []byte { | |
src := rand.NewSource(0) | |
data := make([]byte, length) | |
for i := 0; i < length; i++ { | |
data[i] = byte(src.Int63() & 0xff) | |
} | |
return data | |
} | |
// Tests that a simple copy works | |
func TestCopy(t *testing.T) { | |
data := random(128 * 1024 * 1024) | |
rb := bytes.NewBuffer(data) | |
wb := new(bytes.Buffer) | |
if n, err := Copy(wb, rb, 333333); err != nil { // weird buffer size to catch index bugs | |
t.Fatalf("failed to copy data: %v.", err) | |
} else if int(n) != len(data) { | |
t.Fatalf("data length mismatch: have %d, want %d.", n, len(data)) | |
} | |
if bytes.Compare(data, wb.Bytes()) != 0 { | |
t.Errorf("copy did not work properly.") | |
} | |
} | |
// Various combinations of benchmarks to measure the copy. | |
func BenchmarkCopy1KbData1KbBuffer(b *testing.B) { | |
benchmarkCopy(1024, 1024, b) | |
} | |
func BenchmarkCopy1KbData128KbBuffer(b *testing.B) { | |
benchmarkCopy(1024, 128*1024, b) | |
} | |
func BenchmarkCopy1KbData1MbBuffer(b *testing.B) { | |
benchmarkCopy(1024, 1024*1024, b) | |
} | |
func BenchmarkCopy1MbData1KbBuffer(b *testing.B) { | |
benchmarkCopy(1024*1024, 1024, b) | |
} | |
func BenchmarkCopy1MbData128KbBuffer(b *testing.B) { | |
benchmarkCopy(1024*1024, 128*1024, b) | |
} | |
func BenchmarkCopy1MbData1MbBuffer(b *testing.B) { | |
benchmarkCopy(1024*1024, 1024*1024, b) | |
} | |
func BenchmarkCopy128MbData1KbBuffer(b *testing.B) { | |
benchmarkCopy(128*1024*1024, 1024, b) | |
} | |
func BenchmarkCopy128MbData128KbBuffer(b *testing.B) { | |
benchmarkCopy(128*1024*1024, 128*1024, b) | |
} | |
func BenchmarkCopy128MbData1MbBuffer(b *testing.B) { | |
benchmarkCopy(128*1024*1024, 1024*1024, b) | |
} | |
// BenchmarkCopy measures the performance of the buffered copying for a given | |
// buffer size. | |
func benchmarkCopy(data int, buffer int, b *testing.B) { | |
blob := random(data) | |
b.SetBytes(int64(data)) | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
Copy(ioutil.Discard, bytes.NewBuffer(blob), buffer) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment