Created
July 30, 2017 19:36
-
-
Save anxiousmodernman/5a128b8c9b40bf1f45c7577bbff782c0 to your computer and use it in GitHub Desktop.
Adapting grpc streaming endpoint to io.Reader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// PutStreamAdapter turns our GMData_PutStreamServer into an io.Reader | |
type PutStreamAdapter struct { | |
stream GMData_PutStreamServer | |
buf *bytes.Buffer | |
eof bool | |
} | |
// NewPutStreamAdapter initializes our io.Reader wrapper type for the PutStream endpoint. | |
func NewPutStreamAdapter(s GMData_PutStreamServer) *PutStreamAdapter { | |
var psa PutStreamAdapter | |
psa.stream = s | |
psa.buf = bytes.NewBuffer(make([]byte, 0)) | |
return &psa | |
} | |
// Read implements io.Reader for PutStreamAdapter. We only expect Chunk | |
// types to be read from the stream. | |
func (psa *PutStreamAdapter) Read(b []byte) (int, error) { | |
var n int | |
// first, check our internal buffer for contents. If they exist, read from there. | |
if psa.buf.Len() > 0 { | |
var err error | |
n, err = psa.buf.Read(b) | |
if err == io.EOF { | |
if psa.eof { | |
// We are at the end of the grpc stream, AND we have drained our | |
// internal buffer, return EOF to our consumer. | |
if n != 0 { | |
panic("expected internal buffer to be empty") | |
} | |
return n, err | |
} | |
// We are not at the end of our grpc stream. Do not return EOF to our | |
// consumer. They should take n bytes from b and call Read again. | |
psa.buf.Reset() | |
return n, nil | |
} | |
if err != nil { | |
psa.buf.Reset() | |
return n, err | |
} | |
return n, nil | |
} | |
frg, err := psa.stream.Recv() | |
if err != nil && err != io.EOF { | |
// Some kind of grpc stream error has occured. | |
return 0, err | |
} | |
if err == io.EOF { | |
psa.eof = true | |
if psa.buf.Len() == 0 { | |
// End of grpc stream, and our buffer is empty. Return EOF. | |
return 0, io.EOF | |
} | |
// delegate to our buffer's read. This might return EOF, or the caller might | |
// call Read again. | |
return psa.buf.Read(b) | |
} | |
// Cast our StreamFragment to the expected type. | |
ch, ok := frg.Msg.(*StreamFragment_Chunk) | |
if !ok { | |
t := reflect.TypeOf(frg.Msg) | |
return 0, fmt.Errorf("expected *StreamFragment_Chunk type but got %v", t.Name()) | |
} | |
// Write our data to our internal bytes.Buffer, which grows automatically. | |
_, err = psa.buf.Write(ch.Chunk.Data) | |
if err != nil { | |
return 0, err | |
} | |
// read immediately from our buffer | |
return psa.buf.Read(b) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment