Last active
October 7, 2021 14:16
-
-
Save dillonstreator/ef0e9b3583f53e445688810d1e4972a5 to your computer and use it in GitHub Desktop.
concurrently process items with a bounded channel with storing job results, graceful shutdown, pickup where left off, and capturing of all errors
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"math/rand" | |
"os" | |
"os/signal" | |
"strconv" | |
"sync" | |
"syscall" | |
"time" | |
"github.com/avast/retry-go" | |
"github.com/pkg/errors" | |
) | |
const ( | |
itemCount = 100 | |
maxConcurrentItems = 15 | |
maxPerItemRetries = 3 | |
maxRetryJitterInMs = 2500 | |
maxProcessWaitTimeInMs = 3000 | |
failureRate = .95 | |
payloadKeyCompletedAt = "completedAt" | |
payloadKeyCompletedMap = "completedMap" | |
) | |
func main() { | |
rand.Seed(time.Now().Unix()) | |
payload, err := getPayload() | |
if err != nil { | |
log.Fatal(err) | |
} | |
if _, ok := payload[payloadKeyCompletedAt]; ok { | |
log.Fatal("job already completed") | |
} | |
if _, ok := payload[payloadKeyCompletedMap].(map[string]interface{}); !ok { | |
payload[payloadKeyCompletedMap] = map[string]interface{}{} | |
} | |
ctx, cancel := context.WithCancel(context.Background()) | |
go func() { | |
ch := make(chan os.Signal, 1) | |
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) | |
<-ch | |
cancel() | |
}() | |
items := make([]string, itemCount) | |
for i := 0; i < itemCount; i++ { | |
items[i] = strconv.Itoa(i + 1) | |
} | |
boundedCh := make(chan struct{}, maxConcurrentItems) | |
wg := sync.WaitGroup{} | |
mu := sync.Mutex{} | |
errs := []error{} | |
syncAddErr := func(err error) { | |
mu.Lock() | |
defer mu.Unlock() | |
errs = append(errs, err) | |
} | |
var gracefulShutdownStart time.Time | |
for _, item := range items { | |
if _, ok := payload[payloadKeyCompletedMap].(map[string]interface{})[item]; ok { | |
fmt.Printf("skipping %s\n", item) | |
continue | |
} | |
boundedCh <- struct{}{} | |
if len(errs) > 0 { | |
gracefulShutdownStart = time.Now() | |
fmt.Println("stopping after allowing in flight processes to finish") | |
break | |
} | |
it := item | |
wg.Add(1) | |
go func() { | |
defer func() { | |
wg.Done() | |
<-boundedCh | |
}() | |
select { | |
case <-ctx.Done(): | |
syncAddErr(ctx.Err()) | |
return | |
default: | |
fmt.Printf("processing %s\n", it) | |
err := retry.Do(func() error { | |
return process(it) | |
}, retry.Attempts(maxPerItemRetries), retry.MaxJitter(time.Millisecond*maxRetryJitterInMs)) | |
if err != nil { | |
syncAddErr(errors.Wrap(err, fmt.Sprintf("processing %s", it))) | |
return | |
} | |
mu.Lock() | |
defer mu.Unlock() | |
payload[payloadKeyCompletedMap].(map[string]interface{})[it] = struct{}{} | |
err = setPayload(payload) | |
if err != nil { | |
syncAddErr(errors.Wrap(err, fmt.Sprintf("saving completed %s", it))) | |
return | |
} | |
fmt.Printf("successfully completed %s\n", it) | |
} | |
}() | |
} | |
wg.Wait() | |
if !gracefulShutdownStart.IsZero() { | |
fmt.Printf("remining inflight processes took %.2fs to finish\n", time.Since(gracefulShutdownStart).Seconds()) | |
} | |
if len(errs) > 0 { | |
log.Fatal(errs) | |
} | |
payload[payloadKeyCompletedAt] = time.Now() | |
err = setPayload(payload) | |
if err != nil { | |
log.Fatal(err) | |
} | |
fmt.Println("completed successfully") | |
} | |
func process(item string) error { | |
time.Sleep(time.Duration(rand.Intn(maxProcessWaitTimeInMs)) * time.Millisecond) | |
if rand.Float32() >= failureRate { | |
return errors.New("doing thing A failed") | |
} | |
if rand.Float32() >= failureRate { | |
return errors.New("doing thing B failed") | |
} | |
if rand.Float32() >= failureRate { | |
return errors.New("doing thing C failed") | |
} | |
return nil | |
} | |
var payloadFilename = "jobs.json" | |
func getPayload() (map[string]interface{}, error) { | |
b, err := ioutil.ReadFile(payloadFilename) | |
if err != nil { | |
if errors.Is(err, os.ErrNotExist) { | |
return map[string]interface{}{}, nil | |
} | |
return nil, err | |
} | |
var payload map[string]interface{} | |
err = json.Unmarshal(b, &payload) | |
if err != nil { | |
return nil, err | |
} | |
return payload, nil | |
} | |
func setPayload(payload map[string]interface{}) error { | |
jsonPayload, err := json.Marshal(payload) | |
if err != nil { | |
return err | |
} | |
return ioutil.WriteFile(payloadFilename, jsonPayload, os.ModePerm) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment