Last active
March 2, 2016 10:21
-
-
Save shaunlee/9265781 to your computer and use it in GitHub Desktop.
Recycle Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
"sync/atomic" | |
) | |
const ( | |
ITEM_DATA_SIZE = 4096 | |
) | |
const ( | |
ITEM_STATUS_WRITING = 1 << iota | |
ITEM_STATUS_READING | |
) | |
type Item struct { | |
index uint64 | |
status uint8 | |
data []byte | |
length int | |
queue *RecyleQueue | |
} | |
func NewItem(p *RecyleQueue) *Item { | |
return &Item{ | |
index: 0, | |
status: ITEM_STATUS_WRITING, | |
data: make([]byte, ITEM_DATA_SIZE), | |
length: 0, | |
queue: p, | |
} | |
} | |
func (p *Item) Data() []byte { | |
return p.data[:p.length] | |
} | |
func (p *Item) SetData(data []byte) { | |
p.length = len(data) | |
copy(p.data, data) | |
} | |
func (p *Item) Commit() { | |
switch p.status { | |
case ITEM_STATUS_WRITING: | |
// persist to file | |
p.queue.ready(p) | |
case ITEM_STATUS_READING: | |
// delete from file | |
p.queue.reuse(p) | |
} | |
} | |
func (p *Item) setWriting() { | |
p.status = ITEM_STATUS_WRITING | |
} | |
func (p *Item) setReading() { | |
p.status = ITEM_STATUS_READING | |
} | |
type RecyleQueue struct { | |
capacity, index uint64 | |
recyle, queue chan *Item | |
} | |
func NewRecyleQueue(capacity uint64) *RecyleQueue { | |
p := &RecyleQueue{ | |
capacity: capacity, | |
recyle: make(chan *Item, capacity), | |
queue: make(chan *Item, capacity), | |
} | |
// restore queue from file | |
return p | |
} | |
func (p *RecyleQueue) ready(item *Item) { p.queue <- item } | |
func (p *RecyleQueue) reuse(item *Item) { p.recyle <- item } | |
func (p *RecyleQueue) Produce() (*Item, error) { | |
if uint64(len(p.queue)) == p.capacity { | |
return nil, fmt.Errorf("There are no writable item yet") | |
} | |
var item *Item | |
select { | |
case item = <-p.recyle: | |
item.setWriting() | |
default: | |
item = NewItem(p) | |
} | |
item.index = atomic.AddUint64(&p.index, 1) - 1 | |
return item, nil | |
} | |
func (p *RecyleQueue) Consume() (*Item, error) { | |
select { | |
case item := <-p.queue: | |
item.setReading() | |
return item, nil | |
default: | |
return nil, fmt.Errorf("There are no readable entry yet") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment