Last active
December 8, 2020 11:33
-
-
Save aLekSer/392cf3b4cf8cb2c754b632a635c3e7f4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- TicketCache.go | |
+++ BackfillCache.go | |
@@ -1,39 +1,19 @@ | |
-///////////////////////////////////////////////////////////////////// | |
-///////////////////////////////////////////////////////////////////// | |
- | |
-// ticketCache unifies concurrent requests into a single cache update, and | |
+// cache unifies concurrent requests into a single cache update, and | |
// gives a safe view into that map cache. | |
-type ticketCache struct { | |
- store statestore.Service | |
- | |
+type cache struct { | |
+ store statestore.Service | |
requests chan *cacheRequest | |
- | |
// Single item buffered channel. Holds a value when runQuery can be safely | |
// started. Basically a channel/select friendly mutex around runQuery | |
// running. | |
startRunRequest chan struct{} | |
- | |
- wg sync.WaitGroup | |
- | |
+ wg sync.WaitGroup | |
// Mutlithreaded unsafe fields, only to be written by update, and read when | |
// request given the ok. | |
- tickets map[string]*pb.Ticket | |
- err error | |
-} | |
- | |
-func newTicketCache(b *appmain.Bindings, cfg config.View) *ticketCache { | |
- tc := &ticketCache{ | |
- store: statestore.New(cfg), | |
- requests: make(chan *cacheRequest), | |
- startRunRequest: make(chan struct{}, 1), | |
- tickets: make(map[string]*pb.Ticket), | |
- } | |
- | |
- tc.startRunRequest <- struct{}{} | |
- b.AddHealthCheckFunc(tc.store.HealthCheck) | |
- | |
- return tc | |
+ value interface{} | |
+ update func(statestore.Service, interface{}) error | |
+ err error | |
} | |
type cacheRequest struct { | |
@@ -41,7 +21,7 @@ | |
runNow chan struct{} | |
} | |
-func (tc *ticketCache) request(ctx context.Context, f func(map[string]*pb.Ticket)) error { | |
+func (c *cache) request(ctx context.Context, f func(interface{})) error { | |
cr := &cacheRequest{ | |
ctx: ctx, | |
runNow: make(chan struct{}), | |
@@ -51,69 +31,73 @@ | |
for { | |
select { | |
case <-ctx.Done(): | |
- return errors.Wrap(ctx.Err(), "ticket cache request canceled before reuest sent.") | |
- case <-tc.startRunRequest: | |
- go tc.runRequest() | |
- case tc.requests <- cr: | |
+ return errors.Wrap(ctx.Err(), "cache request canceled before request sent.") | |
+ case <-c.startRunRequest: | |
+ go c.runRequest() | |
+ case c.requests <- cr: | |
break sendRequest | |
} | |
} | |
select { | |
case <-ctx.Done(): | |
- return errors.Wrap(ctx.Err(), "ticket cache request canceled waiting for access.") | |
+ return errors.Wrap(ctx.Err(), "cache request canceled waiting for access.") | |
case <-cr.runNow: | |
- defer tc.wg.Done() | |
- } | |
- | |
- if tc.err != nil { | |
- return tc.err | |
- } | |
- | |
- f(tc.tickets) | |
+ defer c.wg.Done() | |
+ } | |
+ | |
+ if c.err != nil { | |
+ return c.err | |
+ } | |
+ | |
+ f(c.value) | |
return nil | |
} | |
-func (tc *ticketCache) runRequest() { | |
+func (c *cache) runRequest() { | |
defer func() { | |
- tc.startRunRequest <- struct{}{} | |
+ c.startRunRequest <- struct{}{} | |
}() | |
// Wait for first query request. | |
- reqs := []*cacheRequest{<-tc.requests} | |
+ reqs := []*cacheRequest{<-c.requests} | |
// Collect all waiting queries. | |
collectAllWaiting: | |
for { | |
select { | |
- case req := <-tc.requests: | |
+ case req := <-c.requests: | |
reqs = append(reqs, req) | |
default: | |
break collectAllWaiting | |
} | |
} | |
- tc.update() | |
+ c.err = c.update(c.store, c.value) | |
stats.Record(context.Background(), cacheWaitingQueries.M(int64(len(reqs)))) | |
- // Send WaitGroup to query calls, letting them run their query on the ticket | |
- // cache. | |
+ // Send WaitGroup to query calls, letting them run their query on the cache. | |
for _, req := range reqs { | |
- tc.wg.Add(1) | |
+ c.wg.Add(1) | |
select { | |
case req.runNow <- struct{}{}: | |
case <-req.ctx.Done(): | |
- tc.wg.Done() | |
- } | |
- } | |
- | |
- // wait for requests to finish using ticket cache. | |
- tc.wg.Wait() | |
-} | |
- | |
-func (tc *ticketCache) update() { | |
- st := time.Now() | |
- previousCount := len(tc.tickets) | |
+ c.wg.Done() | |
+ } | |
+ } | |
+ | |
+ // wait for requests to finish using cache. | |
+ c.wg.Wait() | |
+} | |
+ | |
+func newTicketCache(b *appmain.Bindings, store statestore.Service) *cache { | |
+ c := &cache{ | |
+ store: store, | |
+ requests: make(chan *cacheRequest), | |
+ startRunRequest: make(chan struct{}, 1), | |
+ value: make(map[string]*pb.Ticket), | |
+ update: updateTicketCache, | |
+ } | |
c.startRunRequest <- struct{}{} | |
b.AddHealthCheckFunc(c.store.HealthCheck) | |
@@ -135,40 +119,102 @@ | |
previousCount := len(tickets) | |
currentAll, err := store.GetIndexedIDSet(context.Background()) | |
if err != nil { | |
- tc.err = err | |
- return | |
+ return err | |
} | |
deletedCount := 0 | |
- for id := range tc.tickets { | |
+ for id := range tickets { | |
if _, ok := currentAll[id]; !ok { | |
- delete(tc.tickets, id) | |
+ delete(tickets, id) | |
deletedCount++ | |
} | |
} | |
toFetch := []string{} | |
- | |
for id := range currentAll { | |
- if _, ok := tc.tickets[id]; !ok { | |
+ if _, ok := tickets[id]; !ok { | |
toFetch = append(toFetch, id) | |
} | |
} | |
- newTickets, err := tc.store.GetTickets(context.Background(), toFetch) | |
- if err != nil { | |
- tc.err = err | |
- return | |
+ newTickets, err := store.GetTickets(context.Background(), toFetch) | |
+ if err != nil { | |
+ return err | |
} | |
for _, t := range newTickets { | |
- tc.tickets[t.Id] = t | |
+ tickets[t.Id] = t | |
} | |
stats.Record(context.Background(), cacheTotalItems.M(int64(previousCount))) | |
stats.Record(context.Background(), cacheFetchedItems.M(int64(len(toFetch)))) | |
- stats.Record(context.Background(), cacheUpdateLatency.M(float64(time.Since(st))/float64(time.Millisecond))) | |
- | |
- logger.Debugf("Ticket Cache update: Previous %d, Deleted %d, Fetched %d, Current %d", previousCount, deletedCount, len(toFetch), len(tc.tickets)) | |
- tc.err = nil | |
-} | |
+ stats.Record(context.Background(), cacheUpdateLatency.M(float64(time.Since(t))/float64(time.Millisecond))) | |
+ | |
+ logger.Debugf("Ticket Cache update: Previous %d, Deleted %d, Fetched %d, Current %d", previousCount, deletedCount, len(toFetch), len(tickets)) | |
+ return nil | |
+} | |
+ | |
+func newBackfillCache(b *appmain.Bindings, store statestore.Service) *cache { | |
+ c := &cache{ | |
+ store: store, | |
+ requests: make(chan *cacheRequest), | |
+ startRunRequest: make(chan struct{}, 1), | |
+ value: make(map[string]*pb.Backfill), | |
+ update: updateBackfillCache, | |
+ } | |
+ | |
+ c.startRunRequest <- struct{}{} | |
+ b.AddHealthCheckFunc(c.store.HealthCheck) | |
+ | |
+ return c | |
+} | |
+ | |
+func updateBackfillCache(store statestore.Service, value interface{}) error { | |
+ if value == nil { | |
+ return fmt.Errorf("expecting not nil value") | |
+ } | |
+ | |
+ backfills, ok := value.(map[string]*pb.Backfill) | |
+ if !ok { | |
+ return fmt.Errorf("expecting value type map[string]*pb.Backfill, but got: %T", value) | |
+ } | |
+ | |
+ t := time.Now() | |
+ previousCount := len(backfills) | |
+ index, err := store.GetIndexedBackfills(context.Background()) | |
+ if err != nil { | |
+ return err | |
+ } | |
+ | |
+ deletedCount := 0 | |
+ for id, backfill := range backfills { | |
+ generation, ok := index[id] | |
+ if !ok || backfill.Generation < int64(generation) { | |
+ delete(backfills, id) | |
+ deletedCount++ | |
+ } | |
+ } | |
+ | |
+ toFetch := []string{} | |
+ for id := range index { | |
+ if _, ok := backfills[id]; !ok { | |
+ toFetch = append(toFetch, id) | |
+ } | |
+ } | |
+ | |
+ fetchedBackfills, err := store.GetBackfills(context.Background(), toFetch) | |
+ if err != nil { | |
+ return err | |
+ } | |
+ | |
+ for _, b := range fetchedBackfills { | |
+ backfills[b.Id] = b | |
+ } | |
+ | |
+ stats.Record(context.Background(), cacheTotalItems.M(int64(previousCount))) | |
+ stats.Record(context.Background(), cacheFetchedItems.M(int64(len(toFetch)))) | |
+ stats.Record(context.Background(), cacheUpdateLatency.M(float64(time.Since(t))/float64(time.Millisecond))) | |
+ | |
+ logger.Debugf("Backfill Cache update: Previous %d, Deleted %d, Fetched %d, Current %d", previousCount, deletedCount, len(toFetch), len(backfills)) | |
+ return nil | |
+} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment