Last active
February 25, 2025 22:26
-
-
Save wolfeidau/128ff492f9668a67520a32f47e10a9fe to your computer and use it in GitHub Desktop.
Golang Backpressure Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// The aim of this example is to illustrate backpressure using golang channels and go routines. | |
// | |
// This is the basis for a simple data processing service which could either be reading from | |
// some internal queue or a socket of some sort. | |
import ( | |
"fmt" | |
"math/rand" | |
"os" | |
"os/signal" | |
"sync" | |
"sync/atomic" | |
"syscall" | |
"time" | |
) | |
// how long in seconds events will be collected prior to writing | |
const batchTime = 3 | |
func init() { | |
// always seed the random | |
rand.Seed(time.Now().UnixNano()) | |
} | |
// RunContext which combines all the channels | |
type RunContext struct { | |
inChan chan Event | |
batchChan chan EventBatch | |
backChan chan EventBatch | |
doneChan chan bool | |
// used to report results at the end | |
sumProduced int64 | |
sumSent int64 | |
} | |
// NewRunContext build a new run context which holds | |
// all channels used in this pipeline | |
func NewRunContext() *RunContext { | |
return &RunContext{ | |
inChan: make(chan Event), | |
batchChan: make(chan EventBatch), | |
backChan: make(chan EventBatch, 1), | |
doneChan: make(chan bool), | |
} | |
} | |
// Event simple event | |
type Event int64 | |
// EventBatch which is just a slice of Event | |
type EventBatch []Event | |
// NewEventBatch make a new event empty batch | |
func NewEventBatch() EventBatch { return EventBatch{} } | |
// Merge given an event append it to the event batch | |
func (e EventBatch) Merge(other Event) EventBatch { return append(e, other) } | |
func produce(runCtx *RunContext) { | |
defer close(runCtx.inChan) | |
for { | |
// simulate some delay between groups of incoming events | |
delay := time.Duration(rand.Intn(5)+1) * time.Second | |
time.Sleep(delay) | |
nMessages := rand.Intn(10) + 1 | |
for i := 0; i < nMessages; i++ { | |
// generate a random value | |
e := Event(rand.Intn(10)) | |
fmt.Println("Producing:", e) | |
select { | |
case runCtx.inChan <- e: | |
atomic.AddInt64(&runCtx.sumProduced, int64(e)) // build a sum of the events produced | |
case <-runCtx.doneChan: | |
fmt.Println("producer complete") | |
return | |
} | |
} | |
} | |
} | |
func run(runCtx *RunContext, wg *sync.WaitGroup) { | |
defer wg.Done() | |
eventbatch := NewEventBatch() | |
// Collect events for the configured batch time | |
// this needs to be tuned based on how often you want to | |
// flush data to the writer | |
ticker := time.Tick(time.Duration(batchTime) * time.Second) | |
LOOP: | |
for { | |
select { | |
case ev, ok := <-runCtx.inChan: | |
if !ok { | |
if len(eventbatch) > 0 { | |
fmt.Println("Dispatching last batch") | |
runCtx.batchChan <- eventbatch | |
} | |
close(runCtx.batchChan) | |
fmt.Println("run finished") | |
break LOOP | |
} | |
eventbatch = eventbatch.Merge(ev) | |
case <-ticker: | |
if len(eventbatch) > 0 { | |
fmt.Println("Waiting to send") | |
runCtx.batchChan <- eventbatch | |
eventbatch = <-runCtx.backChan | |
} | |
} | |
} | |
} | |
func batchWriter(runCtx *RunContext, wg *sync.WaitGroup) { | |
defer wg.Done() | |
for { | |
eb, ok := <-runCtx.batchChan | |
if !ok { | |
fmt.Println("Batch writer complete") | |
break | |
} | |
// simulate time to persist the batch using a random delay | |
delay := time.Duration(rand.Intn(3)+1) * time.Second | |
time.Sleep(delay) | |
// We will need to retry if this write fails | |
// and add a exponential backoff | |
for _, e := range eb { | |
atomic.AddInt64(&runCtx.sumSent, int64(e)) | |
} | |
fmt.Println("Batch sent:", eb, delay) | |
runCtx.backChan <- NewEventBatch() | |
} | |
} | |
func waitOnSignal(runCtx *RunContext, sigs <-chan os.Signal) { | |
fmt.Println("awaiting signal") | |
sig := <-sigs | |
fmt.Println(sig) | |
// shut down input | |
close(runCtx.doneChan) | |
} | |
func main() { | |
var wg sync.WaitGroup | |
runCtx := NewRunContext() | |
sigs := make(chan os.Signal, 1) | |
// if you hit CTRL-C or kill the process this channel will | |
// get a signal and trigger a shutdown of the publisher | |
// which in turn should trigger a each step of the pipeline | |
// to exit | |
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) | |
go waitOnSignal(runCtx, sigs) | |
go produce(runCtx) | |
wg.Add(2) | |
go run(runCtx, &wg) | |
go batchWriter(runCtx, &wg) | |
wg.Wait() | |
fmt.Print("\nSummary\n") | |
fmt.Printf(" produced: %d\n", runCtx.sumProduced) | |
fmt.Printf(" sent: %d\n", runCtx.sumSent) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment