Created
January 20, 2015 05:32
-
-
Save btc/cdc37b71975183f2d3cc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package blockservice | |
| import ( | |
| context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" | |
| process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" | |
| "github.com/jbenet/go-ipfs/blocks" | |
| "github.com/jbenet/go-ipfs/exchange" | |
| ) | |
| type worker struct { | |
| // added accepts blocks from client | |
| added chan *blocks.Block | |
| Exchange exchange.Interface | |
| // workQueue is owned by the client worker | |
| // process manages life-cycle | |
| process process.Process | |
| } | |
| func newWorker(e exchange.Interface) *worker { | |
| return &worker{ | |
| Exchange: e, | |
| added: make(chan *blocks.Block, 10000), | |
| process: process.WithParent(process.Background()), // internal management | |
| } | |
| } | |
| func (w *worker) HasBlock(b *blocks.Block) error { | |
| w.added <- b | |
| return nil | |
| } | |
| func (w *worker) Start() { | |
| workerChan := make(chan *blocks.Block) | |
| // clientWorker handles incoming blocks from |w.added| and sends to | |
| // |workerChan|. This will never block the client. | |
| w.process.Go(func(proc process.Process) { | |
| defer close(workerChan) | |
| var workQueue Queue | |
| for { | |
| sendToWorker := workerChan | |
| nextBlock := workQueue.Pop() | |
| if nextBlock == nil { | |
| sendToWorker = nil | |
| } | |
| select { | |
| // if worker is ready and there's a block to process, send the | |
| // block | |
| case sendToWorker <- nextBlock: | |
| case block := <-w.added: | |
| // if the client sends another block, add it to the queue. | |
| workQueue.Push(block) | |
| case <-proc.Closing(): | |
| return | |
| } | |
| } | |
| }) | |
| // reads from |workerChan| until process closes | |
| w.process.Go(func(proc process.Process) { | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| // shuts down an in-progress HasBlock operation | |
| proc.Go(func(proc process.Process) { | |
| <-proc.Closing() | |
| cancel() | |
| }) | |
| for { | |
| select { | |
| case <-proc.Closing(): | |
| return | |
| case block, ok := <-workerChan: | |
| if !ok { | |
| return | |
| } | |
| if err := w.Exchange.HasBlock(ctx, block); err != nil { | |
| // TODO log event | |
| } | |
| } | |
| } | |
| }) | |
| } | |
| func (w *worker) Close() error { | |
| return w.process.Close() | |
| } | |
| type Queue struct { | |
| elems []*blocks.Block | |
| } | |
| func (s *Queue) Push(b *blocks.Block) { | |
| s.elems = append(s.elems, b) | |
| } | |
| func (s *Queue) Pop() *blocks.Block { | |
| if len(s.elems) == 0 { | |
| return nil | |
| } | |
| b := s.elems[0] | |
| s.elems = s.elems[1:] | |
| return b | |
| } | |
| func (s *Queue) Len() int { | |
| return len(s.elems) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment