Skip to content

Instantly share code, notes, and snippets.

@anxiousmodernman
Created July 30, 2017 19:36
Show Gist options
  • Save anxiousmodernman/5a128b8c9b40bf1f45c7577bbff782c0 to your computer and use it in GitHub Desktop.
Save anxiousmodernman/5a128b8c9b40bf1f45c7577bbff782c0 to your computer and use it in GitHub Desktop.
Adapting grpc streaming endpoint to io.Reader
// PutStreamAdapter turns our GMData_PutStreamServer into an io.Reader
type PutStreamAdapter struct {
stream GMData_PutStreamServer
buf *bytes.Buffer
eof bool
}
// NewPutStreamAdapter initializes our io.Reader wrapper type for the PutStream endpoint.
func NewPutStreamAdapter(s GMData_PutStreamServer) *PutStreamAdapter {
var psa PutStreamAdapter
psa.stream = s
psa.buf = bytes.NewBuffer(make([]byte, 0))
return &psa
}
// Read implements io.Reader for PutStreamAdapter. We only expect Chunk
// types to be read from the stream.
func (psa *PutStreamAdapter) Read(b []byte) (int, error) {
var n int
// first, check our internal buffer for contents. If they exist, read from there.
if psa.buf.Len() > 0 {
var err error
n, err = psa.buf.Read(b)
if err == io.EOF {
if psa.eof {
// We are at the end of the grpc stream, AND we have drained our
// internal buffer, return EOF to our consumer.
if n != 0 {
panic("expected internal buffer to be empty")
}
return n, err
}
// We are not at the end of our grpc stream. Do not return EOF to our
// consumer. They should take n bytes from b and call Read again.
psa.buf.Reset()
return n, nil
}
if err != nil {
psa.buf.Reset()
return n, err
}
return n, nil
}
frg, err := psa.stream.Recv()
if err != nil && err != io.EOF {
// Some kind of grpc stream error has occured.
return 0, err
}
if err == io.EOF {
psa.eof = true
if psa.buf.Len() == 0 {
// End of grpc stream, and our buffer is empty. Return EOF.
return 0, io.EOF
}
// delegate to our buffer's read. This might return EOF, or the caller might
// call Read again.
return psa.buf.Read(b)
}
// Cast our StreamFragment to the expected type.
ch, ok := frg.Msg.(*StreamFragment_Chunk)
if !ok {
t := reflect.TypeOf(frg.Msg)
return 0, fmt.Errorf("expected *StreamFragment_Chunk type but got %v", t.Name())
}
// Write our data to our internal bytes.Buffer, which grows automatically.
_, err = psa.buf.Write(ch.Chunk.Data)
if err != nil {
return 0, err
}
// read immediately from our buffer
return psa.buf.Read(b)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment