Last active
April 25, 2024 20:51
-
-
Save jmackie/11570bdcd8a4c10d72619a5e1f21c5f8 to your computer and use it in GitHub Desktop.
Pass a single io.Reader to multiple goroutines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Package fan is a little concurrent io experiment. | |
Example Use Case | |
---------------- | |
You have a function that takes a single io.Reader as an argument. You would like | |
to pass that reader to several processing functions. You could just make the | |
function accept an io.ReadSeeker, invoke each function serially in a for loop, | |
seeking after each call. But that's not cool. | |
So for example: | |
func process(r io.ReadSeeker) error { | |
for _, fn := range fns { | |
err := fn(r) | |
if err != nil { | |
return err | |
} | |
r.Seek(0, io.SeekStart) | |
} | |
} | |
Becomes: | |
import "./fan" | |
func process(r io.Reader) error { | |
fr := fan.Reader{r} | |
for _, fn := range fns { | |
go fn(fr.View()) | |
// ... handle errors via a channel | |
} | |
} | |
Limitations | |
----------- | |
Probably quite a lot. | |
*/ | |
package fan | |
import ( | |
"bytes" | |
"io" | |
"sync" | |
) | |
// Reader wraps an io.Reader, allowing "clones" to be created via the View method. | |
type Reader struct { | |
io.Reader | |
s []byte // buffered io.Reader data | |
mux sync.Mutex // could maybe be replaced by an RWMutex | |
} | |
// View returns a new io.Reader that behaves like a copy of the original io.Reader | |
func (r *Reader) View() io.Reader { | |
var i int64 // current reading index | |
return readFunc(func(p []byte) (int, error) { | |
r.mux.Lock() | |
defer r.mux.Unlock() | |
// Declare the returned error here. It is only assigned by calls to | |
// r.Reader.Read (`if` block below). That way callers see the io.EOF | |
// error only when they reach the limit of r.s | |
var err error | |
// If the client has asked for more data than is available, we need to | |
// grow the buffer. | |
if i+int64(len(p)) > int64(len(r.s)) { | |
cp := make([]byte, len(p)) | |
var n int // don't shadow err | |
n, err = r.Reader.Read(cp) | |
r.s = append(r.s, cp[:n]...) | |
} | |
n := copy(p, r.s[i:]) | |
i += int64(n) | |
return n, err | |
}) | |
} | |
// Original returns the "original" io.Reader without any thread-safety working | |
// behind the scenes. | |
func (r *Reader) Original() io.Reader { | |
return io.MultiReader(bytes.NewReader(r.s), r.Reader) | |
} | |
// ReadFunc follows the design of http.HandlerFunc, allowing us to create io.Reader | |
// functions that can exploit closured variables | |
type readFunc func(p []byte) (n int, err error) | |
func (rf readFunc) Read(p []byte) (n int, err error) { return rf(p) } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fan | |
import ( | |
"bytes" | |
"sync" | |
"testing" | |
"github.com/drhodes/golorem" | |
) | |
// We will be reading some random lorem ipsum text. | |
var data = []byte(lorem.Paragraph(500, 600)) | |
func TestReader(t *testing.T) { | |
r := bytes.NewReader(data) | |
fr := Reader{r, // fan.Reader | |
// Because this is an internal test we need to completely intialise | |
// the struct. But note the API would look like `fan.Reader{r}` | |
[]byte(nil), *new(sync.Mutex), | |
} | |
wg := new(sync.WaitGroup) | |
// Start 10x goroutines | |
for i := 0; i < 10; i++ { | |
wg.Add(1) | |
go func(i int) { | |
defer wg.Done() | |
buf := new(bytes.Buffer) | |
n, err := buf.ReadFrom(fr.View()) | |
// Check the error | |
if err != nil { | |
t.Errorf("Goroutine %d err: %v", i, err) | |
return | |
} | |
// Validate the result | |
if int(n) != len(data) { | |
t.Errorf("Goroutine %d err: expected %d bytes, got %d", i, len(data), n) | |
return | |
} | |
t.Logf("Goroutine %d ok: got %d bytes\n", i, len(buf.Bytes())) | |
}(i) | |
} | |
wg.Wait() | |
} |
Answering titolins question for anyone who might come across:
The the mutex and byte slice in the Reader structure is already initialized by it's zero value when creating a new Reader{}
.
@jmackie: I just came across this gist. I had need of something similar a while back: streamcache
has the same goal as your Reader
, but with a bunch of bells & whistles, such as discarding the cache when it's no longer needed.
It's nice to know somebody else was dealing with the same problem 🤓
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Helped me a lot, thanks man. I was trying to make concurrent put requests and didn't realize the buffer wasn't thread safe.
Just a question, thought (might be really stupid). How do you initialize
Reader
, considerings
andmux
are unexported?I just wrote a simple constructor using the code from your test file. That's intended? I imagine you don't use this as an external package.
Cheers