Skip to content

Instantly share code, notes, and snippets.

@pascaldekloe
Created January 4, 2017 19:20
Show Gist options
  • Save pascaldekloe/2a2a34fa329cca1d871a65b0aa1a2749 to your computer and use it in GitHub Desktop.
Save pascaldekloe/2a2a34fa329cca1d871a65b0aa1a2749 to your computer and use it in GitHub Desktop.
Streaming Colfer records example
func writeAll(w io.Writer, records []*batch.Record) error {
var buf []byte
for _, r := range records {
n, err := r.MarshalLen()
if err != nil {
return err
}
if n > len(buf) {
buf = make([]byte, n*2)
}
r.MarshalTo(buf)
if _, err = w.Write(buf[:n]); err != nil {
return err
}
}
return nil
}
func readAll(r io.Reader, sizeEst int) ([]*batch.Record, error) {
var records []*batch.Record
rec := new(batch.Record)
if sizeEst <= 0 {
sizeEst = batch.ColferSizeMax
}
buf := make([]byte, sizeEst)
var i int
for {
// grow buf when full (safe with ColferSizeMax)
if i == len(buf) {
bigger := make([]byte, len(buf)*2)
copy(bigger, buf)
buf = bigger
}
switch read, err := r.Read(buf[i:]); err {
case nil:
i += read
case io.EOF:
if i == 0 { // success
return records, nil
}
return nil, fmt.Errorf("EOF with %d tailing bytes", i)
default:
return nil, err
}
var offset int
for {
switch n, err := rec.Unmarshal(buf[offset:i]); err {
case nil:
records = append(records, rec)
rec = new(batch.Record)
offset += n
if offset < i {
continue
}
i = 0;
case io.EOF:
if offset < i {
i = copy(buf, buf[offset:i])
} else {
i = 0
}
default:
return nil, err
}
break
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment