- eliminate ChunkStore interface
- eliminate dbapi, replace with netstore
- netstore takes contexts
- refactor chunker in its own package
- localstore does nothing with requests just stores Chunk interface objects (this leaves open the possibility that requests are just stored together with chunks)
- handleRetrieveRequest just uses netstore.Get
- neetData uses HasStored()
- chunk delivery loop uses netstore put
- requesting peer communicated via context.value
Last active
April 27, 2018 08:42
-
-
Save zelig/cad5b633438040840e61e2f61b3b73da to your computer and use it in GitHub Desktop.
chunk refactor proposal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type Address []byte | |
// LocalStore just implements a combined memory cache and disk Store | |
// it only stores values on disk if they are Marshalable | |
type LocalStore interface { | |
Get(ref Address) (Chunk, error) | |
Put(ch Chunk) (func() error, error) | |
} | |
// an interface for iteratable db store used by the syncer | |
type SyncerStore interface { | |
StorageCount(po int) int64 | |
Iterator(po int8, from, to int64) | |
} | |
type Marshalable interface { | |
Marshal() ([]byte, error) | |
} | |
// Chunk interface implemented by requests and data chunks | |
type Chunk interface { | |
Address() Address | |
Data() ([]byte, error) | |
Span() ([]byte, error) | |
Marshal() []byte | |
} | |
type chunk struct { | |
addr Address | |
sdata []byte | |
data []byte | |
metadata []byte | |
span int64 | |
} | |
func NewChunk(addr Address, data []byte) *Chunk { | |
return &chunk{ | |
addr: addr, | |
sdata: data, | |
} | |
} | |
// Request is created when a chunk is not found locally | |
// it starts a process of fetching once and keeps it | |
// alive until all active requests complete | |
// either by the chunk delivered or all cancelled/timed out | |
type Request struct { | |
chunk *chunk | |
deliveredC chan struct{} | |
quitC chan struct{} | |
init sync.Once | |
wg sync.WaitGroup | |
// Deliver([]byte) error | |
} | |
func NewRequest() *Request { | |
return &Request{ | |
deliveredC: make(chan struct{}), | |
quitC: make(chan struct{}), | |
} | |
} | |
// NetStore is the interface for | |
type NetStore interface { | |
Get(ctx context.Context, ref Address) (Chunk, func(ctx context.Context) (Chunk, error), error) | |
Put(ctx context.Context, ch Chunk) (func(ctx context.Context) error, error) | |
HasStored(ctx context.Context, ref Address) (func(context.Context) error, error) | |
} | |
// Get attempts at retrieving the chunk from LocalStore | |
// if it is not found, attempts at retrieving an existing requests | |
// if none exists, creates one and saves it in the requests cache | |
// From here on, all Get will hit on this request until the chunk is delivered | |
// or all request contexts are done | |
// it returns a chunk, a fetcher function and an error | |
// if chunk is nil, fetcher needs to be called with a context to return the chunk | |
func (n *NetStore) Get(ctx context.Context, ref Address) (Chunk, func(ctx context.Context) (Chunk, error), error) { | |
n.mu.Lock() | |
defer n.mu.Unlock() | |
chunk, err := n.LocalStore.Get(ctx, ref) | |
if err == nil { | |
return chunk, nil, nil | |
} | |
request, err := n.getOrCreateRequest(ref) | |
if err != nil { | |
return nil, nil, err | |
} | |
return nil, request.Run, nil | |
} | |
// getOrCreateRequest attempts at retrieving an existing requests | |
// if none exists, creates one and saves it in the requests cache | |
// caller must hold the lock | |
func (n *NetStore) getOrCreateRequest(ref Address) (*Request, error) { | |
r, err := n.requests.Get(ref) | |
if err == nil { | |
return r, err | |
} | |
r = NewRequest(n) | |
err = n.requests.Add(ref, r) | |
if err != nil { | |
return nil, err | |
} | |
return r, nil | |
} | |
// Run is a fetcher function to be called | |
// it launches the fetching only once by calling | |
// the retrieve function | |
func (r *Request) Run(ctx context.Context) (Chunk, error) { | |
r.wg.Add(1) | |
r.init.Do(func() { r.retrieve(ctx) }) | |
defer r.wg.Done() | |
select { | |
case <-ctx.Done(): | |
return nil, ctx.Err() | |
case <-r.ReqC: | |
return r.chunk, nil | |
} | |
} | |
// retrieve is called only once, it launches | |
// fetching by calling netstores retrieve function | |
// it keeps the request alive by rerequesting | |
// * after a search timeouted if request was successful | |
// * after retryInterval if request was unsuccessful | |
func (r *Request) retrieve(ctx context.Context) { | |
wait := time.NewTimer(0) | |
var quitC chan struct{} | |
var err error | |
// wait till all actual requests a closed | |
go func() { | |
r.wg.Wait() | |
close(r.quitC) | |
}() | |
// loop that keeps the request alive | |
go func() { | |
// remove the request from the cache when all requests | |
// contexts closed | |
defer func() { | |
r.netstore.requests.Remove(r.Ref) | |
}() | |
F: | |
for { | |
quitC, err = r.netstore.retrieve(ctx, r.ref) | |
if err != nil { | |
// retrieve error, wait before retry | |
wait.Reset(retryInterval) | |
} else { | |
// otherwise wait for response | |
wait.Reset(serchTimeout) | |
} | |
select { | |
case <-wait.C: | |
// search or retry timeout; rerequest | |
case <-quitC: | |
// requested downstream peer disconnected; rerequest | |
case <-r.quitC: | |
// all request context closed, can quit | |
break F | |
} | |
} | |
select { | |
case <-r.deliveredC: | |
default: | |
} | |
}() | |
} | |
// Put stores a chunk in localstore, manages the request for the chunk if exists | |
// by closing the ReqC channel | |
func (n *NetStore) Put(ctx context.Context, ch Chunk) (func(ctx context.Context) error, error) { | |
n.mu.Lock() | |
defer n.mu.Unlock() | |
defer func() { | |
r := n.requests.Get(ch.Ref) | |
if r != nil { | |
r.chunk = ch | |
close(r.deliveredC) | |
n.requests.Remove(ch.Ref) | |
} | |
}() | |
waitToStore, err := n.LocalStore.Put(ch) | |
if err != nil { | |
return nil, err | |
} | |
return waitToStore, nil | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Could this be in a separate package from other storage code? As far as I see it doesn't have to depend on storage, it just needs an interfaces for NetStore.