Created
January 4, 2017 19:20
-
-
Save pascaldekloe/2a2a34fa329cca1d871a65b0aa1a2749 to your computer and use it in GitHub Desktop.
Streaming Colfer records example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func writeAll(w io.Writer, records []*batch.Record) error { | |
var buf []byte | |
for _, r := range records { | |
n, err := r.MarshalLen() | |
if err != nil { | |
return err | |
} | |
if n > len(buf) { | |
buf = make([]byte, n*2) | |
} | |
r.MarshalTo(buf) | |
if _, err = w.Write(buf[:n]); err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
func readAll(r io.Reader, sizeEst int) ([]*batch.Record, error) { | |
var records []*batch.Record | |
rec := new(batch.Record) | |
if sizeEst <= 0 { | |
sizeEst = batch.ColferSizeMax | |
} | |
buf := make([]byte, sizeEst) | |
var i int | |
for { | |
// grow buf when full (safe with ColferSizeMax) | |
if i == len(buf) { | |
bigger := make([]byte, len(buf)*2) | |
copy(bigger, buf) | |
buf = bigger | |
} | |
switch read, err := r.Read(buf[i:]); err { | |
case nil: | |
i += read | |
case io.EOF: | |
if i == 0 { // success | |
return records, nil | |
} | |
return nil, fmt.Errorf("EOF with %d tailing bytes", i) | |
default: | |
return nil, err | |
} | |
var offset int | |
for { | |
switch n, err := rec.Unmarshal(buf[offset:i]); err { | |
case nil: | |
records = append(records, rec) | |
rec = new(batch.Record) | |
offset += n | |
if offset < i { | |
continue | |
} | |
i = 0; | |
case io.EOF: | |
if offset < i { | |
i = copy(buf, buf[offset:i]) | |
} else { | |
i = 0 | |
} | |
default: | |
return nil, err | |
} | |
break | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment