Skip to content

Instantly share code, notes, and snippets.

@btc
Created January 20, 2015 05:32
Show Gist options
  • Select an option

  • Save btc/cdc37b71975183f2d3cc to your computer and use it in GitHub Desktop.

Select an option

Save btc/cdc37b71975183f2d3cc to your computer and use it in GitHub Desktop.
package blockservice
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
"github.com/jbenet/go-ipfs/blocks"
"github.com/jbenet/go-ipfs/exchange"
)
type worker struct {
// added accepts blocks from client
added chan *blocks.Block
Exchange exchange.Interface
// workQueue is owned by the client worker
// process manages life-cycle
process process.Process
}
func newWorker(e exchange.Interface) *worker {
return &worker{
Exchange: e,
added: make(chan *blocks.Block, 10000),
process: process.WithParent(process.Background()), // internal management
}
}
func (w *worker) HasBlock(b *blocks.Block) error {
w.added <- b
return nil
}
func (w *worker) Start() {
workerChan := make(chan *blocks.Block)
// clientWorker handles incoming blocks from |w.added| and sends to
// |workerChan|. This will never block the client.
w.process.Go(func(proc process.Process) {
defer close(workerChan)
var workQueue Queue
for {
sendToWorker := workerChan
nextBlock := workQueue.Pop()
if nextBlock == nil {
sendToWorker = nil
}
select {
// if worker is ready and there's a block to process, send the
// block
case sendToWorker <- nextBlock:
case block := <-w.added:
// if the client sends another block, add it to the queue.
workQueue.Push(block)
case <-proc.Closing():
return
}
}
})
// reads from |workerChan| until process closes
w.process.Go(func(proc process.Process) {
ctx, cancel := context.WithCancel(context.Background())
// shuts down an in-progress HasBlock operation
proc.Go(func(proc process.Process) {
<-proc.Closing()
cancel()
})
for {
select {
case <-proc.Closing():
return
case block, ok := <-workerChan:
if !ok {
return
}
if err := w.Exchange.HasBlock(ctx, block); err != nil {
// TODO log event
}
}
}
})
}
func (w *worker) Close() error {
return w.process.Close()
}
type Queue struct {
elems []*blocks.Block
}
func (s *Queue) Push(b *blocks.Block) {
s.elems = append(s.elems, b)
}
func (s *Queue) Pop() *blocks.Block {
if len(s.elems) == 0 {
return nil
}
b := s.elems[0]
s.elems = s.elems[1:]
return b
}
func (s *Queue) Len() int {
return len(s.elems)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment