Last active
March 2, 2016 10:25
-
-
Save shaunlee/7003166 to your computer and use it in GitHub Desktop.
RingBuffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"os" | |
"io" | |
"bufio" | |
"runtime" | |
"path/filepath" | |
"encoding/binary" | |
"code.google.com/p/snappy-go/snappy" | |
) | |
const ( | |
MAX_LINES uint64 = 10000 | |
MAX_UINT64 uint64 = 0xffffffffffffffff | |
) | |
const ( | |
CAPACITY_1M uint64 = 1 << (20 + iota) | |
CAPACITY_2M | |
CAPACITY_4M | |
CAPACITY_8M | |
CAPACITY_16M | |
) | |
const ( | |
ENTRY_FLAG_COMPRESSED uint8 = 1 << iota | |
ENTRY_FLAG_DELETED | |
) | |
type Entry interface { | |
Data() []byte | |
SetData(data []byte) | |
Commit() | |
} | |
type rbentry struct { | |
index uint64 | |
data []byte | |
rb *RingBuffer | |
} | |
func (p *rbentry) Data() []byte { | |
return p.data | |
} | |
func (p *rbentry) SetData(data []byte) { | |
p.data = data | |
} | |
func (p *rbentry) Commit() { | |
go p.rb.dealt(p) | |
} | |
/** | |
* 文件格式: | |
* CURRENT: 记录队列头(读的位置)和尾(写的位置), 16bits(uint64 * 2) | |
* db00000000.dat: | |
* Package[index:uint64, flag(ENTRY_FLAG_COMPRESSED):uint8, data length:uint32, string] | |
* 已处理完成(删除)的数据: Package[index:uint64, flag(ENTRY_FLAG_DELETED):uint8] | |
*/ | |
type RingBuffer struct { | |
dbname string | |
head, tail, capacity, mod uint64 | |
buffer []Entry | |
gcounter uint64 | |
} | |
/** | |
* capacity 必须是 2 的 N 次方, 用位运算代替 mod 计算 | |
* | |
* 如: index = tail & (capacity - 1) 代替 index = tail % capacity | |
*/ | |
func NewRingBuffer(dbname string, capacity uint64) (*RingBuffer, error) { | |
if capacity == 0 || capacity & (capacity - 1) != 0 { | |
return nil, fmt.Errorf("Capacity must be a power of 2") | |
} | |
if _, err := os.Stat(dbname); err != nil { | |
if !os.IsNotExist(err) { return nil, err } else { | |
if err = os.MkdirAll(dbname, 0755); err != nil { return nil, err } | |
} | |
} | |
p := &RingBuffer{} | |
p.dbname = dbname | |
p.capacity = capacity | |
p.mod = capacity - 1 | |
p.buffer = make([]Entry, p.capacity) | |
log.Println("RingBuffer: restoring the buffer ...") | |
p.head = MAX_UINT64 | |
p.restore() | |
if p.head == MAX_UINT64 { p.head = 0 } | |
if p.tail - p.head > capacity { return nil, fmt.Errorf("Capacity is not enough to recover the buffer") } | |
p.skip() | |
log.Println("RingBuffer: now i'm ready") | |
go p.gc() | |
return p, nil | |
} | |
func (p *RingBuffer) restore() { | |
var isEmpty = true | |
if files, err := filepath.Glob(fmt.Sprintf("%s/*.dat", p.dbname)); err == nil { | |
for _, filename := range files { | |
if fp, err := os.Open(filename); err == nil { | |
reader := bufio.NewReader(fp) | |
for { | |
var ( | |
length uint32 | |
flag uint8 | |
compressed, deleted bool | |
nread int = 0 | |
entry = &rbentry{rb: p} | |
) | |
if err := binary.Read(reader, binary.LittleEndian, &entry.index); err == io.EOF { | |
break | |
} | |
if err := binary.Read(reader, binary.LittleEndian, &flag); err == io.EOF { | |
break | |
} else { | |
compressed = (flag & ENTRY_FLAG_COMPRESSED) == ENTRY_FLAG_COMPRESSED | |
deleted = (flag & ENTRY_FLAG_DELETED) == ENTRY_FLAG_DELETED | |
} | |
if deleted { | |
p.buffer[entry.index & p.mod] = nil | |
} else { | |
isEmpty = false | |
if err := binary.Read(reader, binary.LittleEndian, &length); err == io.EOF { | |
break | |
} else { | |
data := make([]byte, length) | |
for nread < int(length) { | |
if n, err := reader.Read(data[nread:]); err != nil { break } else { nread += n } | |
} | |
entry.SetData(data) | |
if compressed { | |
if buf, err := snappy.Decode(nil, data); err == nil { | |
entry.SetData(buf) | |
} | |
} | |
} | |
p.buffer[entry.index & p.mod] = entry | |
if entry.index < p.head { | |
p.head = entry.index | |
} | |
if entry.index > p.tail { | |
p.tail = entry.index | |
} | |
} | |
} | |
fp.Close() | |
} | |
} | |
} | |
if !isEmpty { p.tail++ } | |
} | |
func (p *RingBuffer) skip() { | |
if p.tail > p.head { | |
var capables = make([]uint64, 0) | |
for index := p.tail - 1; index >= p.head; index-- { | |
entry := p.buffer[index & p.mod] | |
if entry == nil { | |
capables = append(capables, index) | |
} else if len(capables) > 0 { | |
//[0, 1, 2, 3, 4] | |
// delete 1, 3 | |
//[0, -, 2, -, 4] | |
//[0, -, -, 2, 4] | |
//[-, -, 0, 2, 4] | |
entry.(*rbentry).index = capables[0] | |
p.buffer[capables[0] & p.mod] = entry | |
capables = append(capables[1:], index) | |
} | |
if index == 0 { break } | |
} | |
if n := uint64(len(capables)); n > 0 { | |
p.head = p.head + n | |
} | |
} | |
} | |
func (p *RingBuffer) gc() { | |
index := p.head | |
if index - index % MAX_LINES > 0 { | |
index = index / MAX_LINES - 1 | |
for index >= 0 { | |
filename := fmt.Sprintf("%s/db%.8x.dat", p.dbname, index) | |
if _, err := os.Stat(filename); err != nil { | |
break | |
} else { | |
os.Remove(filename) | |
index-- | |
} | |
} | |
} | |
} | |
func (p *RingBuffer) Publish(data []byte) error { | |
if p.tail - p.head >= p.capacity { | |
return fmt.Errorf("There are no writable entry yet") | |
} | |
entry := &rbentry{index: p.tail, rb: p} | |
entry.SetData(data) | |
go p.persist(entry) | |
p.buffer[p.tail & p.mod] = entry | |
p.tail++ | |
return nil | |
} | |
func (p *RingBuffer) Consume() (Entry, error) { | |
if p.head - p.tail < 1 { | |
return nil, fmt.Errorf("There are no readable entry yet") | |
} | |
var entry Entry | |
for ; p.head < p.tail; p.head++ { | |
entry = p.buffer[p.head & p.mod] | |
if entry != nil { | |
break | |
} else { | |
runtime.Gosched() | |
} | |
} | |
p.head++ | |
return entry, nil | |
} | |
func (p *RingBuffer) persist(entry *rbentry) error { | |
filename := fmt.Sprintf("%s/db%.8x.dat", p.dbname, entry.index / MAX_LINES) | |
if fp, err := os.OpenFile(filename, os.O_CREATE | os.O_RDWR | os.O_APPEND, 0644); err != nil { | |
return err | |
} else { | |
var ( | |
flag uint8 = 0 | |
length int | |
nwrite int = 0 | |
data []byte | |
) | |
writer := bufio.NewWriter(fp) | |
if buf, err := snappy.Encode(nil, entry.data); err != nil { | |
data = entry.data | |
} else { | |
flag |= ENTRY_FLAG_COMPRESSED | |
data = buf | |
} | |
length = len(data) | |
binary.Write(writer, binary.LittleEndian, uint64(entry.index)) | |
binary.Write(writer, binary.LittleEndian, flag) | |
binary.Write(writer, binary.LittleEndian, uint32(length)) | |
for nwrite < length { | |
if n, err := writer.Write(data[nwrite:]); err != nil { break } else { nwrite += n } | |
} | |
writer.Flush() | |
fp.Close() | |
} | |
return nil | |
} | |
func (p *RingBuffer) dealt(entry *rbentry) error { | |
filename := fmt.Sprintf("%s/db%.8x.dat", p.dbname, entry.index / MAX_LINES) | |
if fp, err := os.OpenFile(filename, os.O_CREATE | os.O_RDWR | os.O_APPEND, 0644); err != nil { | |
return err | |
} else { | |
var flag uint8 = ENTRY_FLAG_DELETED | |
writer := bufio.NewWriter(fp) | |
binary.Write(writer, binary.LittleEndian, uint64(entry.index)) | |
binary.Write(writer, binary.LittleEndian, flag) | |
writer.Flush() | |
fp.Close() | |
if p.gcounter > MAX_LINES { | |
go p.gc() | |
p.gcounter = 0 | |
} else { | |
p.gcounter++ | |
} | |
} | |
return nil | |
} | |
func (p *RingBuffer) Status() string { | |
return fmt.Sprintf("%d/%d, head: %d, tail: %d", | |
p.tail - p.head, p.capacity, p.head, p.tail) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment